@@ -56,18 +56,25 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
56
56
57
57
class Client
58
58
59
- def initialize ( socket , logger )
59
+ ##
60
+ # @param socket [Socket]
61
+ # @param logger_context [#log_warn&#log_error]
62
+ def initialize ( socket , logger_context )
60
63
@socket = socket
61
- @logger = logger
64
+ @logger_context = logger_context
62
65
@queue = Queue . new
63
66
end
64
67
65
68
def run
66
69
loop do
67
70
begin
68
- @socket . write ( @queue . pop )
71
+ remaining_payload = @queue . pop
72
+ while remaining_payload && remaining_payload . bytesize > 0
73
+ written_bytes_size = @socket . write ( remaining_payload )
74
+ remaining_payload = remaining_payload . byteslice ( written_bytes_size ..-1 )
75
+ end
69
76
rescue => e
70
- log_warn 'socket write failed:' , e , socket : ( @socket ? @socket . to_s : nil )
77
+ @logger_context . log_warn 'socket write failed:' , e , socket : ( @socket ? @socket . to_s : nil )
71
78
break
72
79
end
73
80
end
@@ -80,7 +87,7 @@ def write(msg)
80
87
def close
81
88
@socket . close
82
89
rescue => e
83
- log_warn 'socket close failed:' , e , socket : ( @socket ? @socket . to_s : nil )
90
+ @logger_context . log_warn 'socket close failed:' , e , socket : ( @socket ? @socket . to_s : nil )
84
91
end
85
92
end # class Client
86
93
@@ -135,69 +142,85 @@ def register
135
142
require "socket"
136
143
require "stud/try"
137
144
@closed = Concurrent ::AtomicBoolean . new ( false )
145
+ @thread_no = Concurrent ::AtomicFixnum . new ( 0 )
138
146
setup_ssl if @ssl_enable
139
147
140
148
if server?
141
- @logger . info ( "Starting tcp output listener" , :address => "#{ @host } :#{ @port } " )
142
- begin
143
- @server_socket = TCPServer . new ( @host , @port )
144
- rescue Errno ::EADDRINUSE
145
- @logger . error ( "Could not start tcp server: Address in use" , host : @host , port : @port )
146
- raise
147
- end
148
- if @ssl_enable
149
- @server_socket = OpenSSL ::SSL ::SSLServer . new ( @server_socket , @ssl_context )
150
- end # @ssl_enable
151
- @client_threads = Concurrent ::Array . new
152
-
153
- @accept_thread = Thread . new ( @server_socket ) do |server_socket |
154
- LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|server_accept" )
155
- loop do
156
- break if @closed . value
157
- client_socket = server_socket . accept_nonblock exception : false
158
- if client_socket == :wait_readable
159
- IO . select [ server_socket ]
160
- next
161
- end
162
- Thread . start ( client_socket ) do |client_socket |
163
- # monkeypatch a 'peer' method onto the socket.
164
- client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
165
- @logger . debug ( "accepted connection" , client : client_socket . peer , server : "#{ @host } :#{ @port } " )
166
- client = Client . new ( client_socket , @logger )
167
- Thread . current [ :client ] = client
168
- LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @client_threads . size } " )
169
- @client_threads << Thread . current
170
- client . run unless @closed . value
171
- end
149
+ run_as_server
150
+ else
151
+ run_as_client
152
+ end
153
+ end
154
+
155
+ def run_as_server
156
+ @logger . info ( "Starting tcp output listener" , :address => "#{ @host } :#{ @port } " )
157
+ begin
158
+ @server_socket = TCPServer . new ( @host , @port )
159
+ rescue Errno ::EADDRINUSE
160
+ @logger . error ( "Could not start tcp server: Address in use" , host : @host , port : @port )
161
+ raise
162
+ end
163
+ if @ssl_enable
164
+ @server_socket = OpenSSL ::SSL ::SSLServer . new ( @server_socket , @ssl_context )
165
+ end # @ssl_enable
166
+ @client_threads = Concurrent ::Array . new
167
+
168
+ @accept_thread = Thread . new ( @server_socket ) do |server_socket |
169
+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|server_accept" )
170
+ loop do
171
+ break if @closed . value
172
+ client_socket = server_socket . accept_nonblock exception : false
173
+ if client_socket == :wait_readable
174
+ IO . select [ server_socket ]
175
+ next
176
+ end
177
+ Thread . start ( client_socket ) do |client_socket |
178
+ # monkeypatch a 'peer' method onto the socket.
179
+ client_socket . extend ( ::LogStash ::Util ::SocketPeer )
180
+ @logger . debug ( "accepted connection" , client : client_socket . peer , server : "#{ @host } :#{ @port } " )
181
+ client = Client . new ( client_socket , self )
182
+ Thread . current [ :client ] = client
183
+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @thread_no . increment } " )
184
+ @client_threads << Thread . current
185
+ client . run unless @closed . value
172
186
end
173
187
end
188
+ end
174
189
175
- @codec . on_event do |event , payload |
176
- @client_threads . select! ( &:alive? )
177
- @client_threads . each do |client_thread |
178
- client_thread [ :client ] . write ( payload )
179
- end
190
+ @codec . on_event do |event , payload |
191
+ @client_threads . select! ( &:alive? )
192
+ @client_threads . each do |client_thread |
193
+ client_thread [ :client ] . write ( payload )
180
194
end
181
- else
182
- client_socket = nil
183
- @codec . on_event do |event , payload |
184
- begin
185
- client_socket = connect unless client_socket
186
- r , w , e = IO . select ( [ client_socket ] , [ client_socket ] , [ client_socket ] , nil )
195
+ end
196
+ end
197
+
198
+ def run_as_client
199
+ client_socket = nil
200
+ @codec . on_event do |event , payload |
201
+ begin
202
+ client_socket = connect unless client_socket
203
+
204
+ writable_io = nil
205
+ while writable_io . nil? || writable_io . any? == false
206
+ readable_io , writable_io , _ = IO . select ( [ client_socket ] , [ client_socket ] )
207
+
187
208
# don't expect any reads, but a readable socket might
188
209
# mean the remote end closed, so read it and throw it away.
189
210
# we'll get an EOFError if it happens.
190
- client_socket . sysread ( 16384 ) if r . any?
211
+ readable_io . each { |readable | readable . sysread ( 16384 ) }
212
+ end
191
213
192
- # Now send the payload
193
- client_socket . syswrite ( payload ) if w . any?
194
- rescue => e
195
- log_warn "client socket failed:" , e , host : @host , port : @port , socket : ( client_socket ? client_socket . to_s : nil )
196
- client_socket . close rescue nil
197
- client_socket = nil
198
- sleep @reconnect_interval
199
- retry
214
+ while payload && payload . bytesize > 0
215
+ written_bytes_size = client_socket . syswrite ( payload )
216
+ payload = payload . byteslice ( written_bytes_size ..-1 )
200
217
end
218
+ rescue => e
219
+ log_warn "client socket failed:" , e , host : @host , port : @port , socket : ( client_socket ? client_socket . to_s : nil )
220
+ client_socket . close rescue nil
221
+ client_socket = nil
222
+ sleep @reconnect_interval
223
+ retry
201
224
end
202
225
end
203
226
end
@@ -219,6 +242,18 @@ def close
219
242
end
220
243
end
221
244
245
+ def log_warn ( msg , e , backtrace : @logger . debug? , **details )
246
+ details = details . merge message : e . message , exception : e . class
247
+ details [ :backtrace ] = e . backtrace if backtrace
248
+ @logger . warn ( msg , details )
249
+ end
250
+
251
+ def log_error ( msg , e , backtrace : @logger . info? , **details )
252
+ details = details . merge message : e . message , exception : e . class
253
+ details [ :backtrace ] = e . backtrace if backtrace
254
+ @logger . error ( msg , details )
255
+ end
256
+
222
257
private
223
258
224
259
def connect
@@ -235,7 +270,7 @@ def connect
235
270
raise
236
271
end
237
272
end
238
- client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
273
+ client_socket . extend ( ::LogStash ::Util ::SocketPeer )
239
274
@logger . debug ( "opened connection" , :client => client_socket . peer )
240
275
return client_socket
241
276
rescue => e
@@ -253,16 +288,4 @@ def pipeline_id
253
288
execution_context . pipeline_id || 'main'
254
289
end
255
290
256
- def log_warn ( msg , e , backtrace : @logger . debug? , **details )
257
- details = details . merge message : e . message , exception : e . class
258
- details [ :backtrace ] = e . backtrace if backtrace
259
- @logger . warn ( msg , details )
260
- end
261
-
262
- def log_error ( msg , e , backtrace : @logger . info? , **details )
263
- details = details . merge message : e . message , exception : e . class
264
- details [ :backtrace ] = e . backtrace if backtrace
265
- @logger . error ( msg , details )
266
- end
267
-
268
291
end # class LogStash::Outputs::Tcp
0 commit comments