@@ -56,6 +56,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
56
56
57
57
class Client
58
58
59
+ ##
60
+ # @param socket [Socket]
61
+ # @param logger_context [#log_warn&#log_error]
59
62
def initialize ( socket , logger_context )
60
63
@socket = socket
61
64
@logger_context = logger_context
@@ -142,8 +145,11 @@ def register
142
145
@thread_no = Concurrent ::AtomicFixnum . new ( 0 )
143
146
setup_ssl if @ssl_enable
144
147
145
- run_as_server if server?
146
- run_as_client unless server?
148
+ if server?
149
+ run_as_server
150
+ else
151
+ run_as_client
152
+ end
147
153
end
148
154
149
155
def run_as_server
@@ -174,8 +180,7 @@ def run_as_server
174
180
@logger . debug ( "accepted connection" , client : client_socket . peer , server : "#{ @host } :#{ @port } " )
175
181
client = Client . new ( client_socket , self )
176
182
Thread . current [ :client ] = client
177
- @thread_no . increment
178
- LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @thread_no . value } " )
183
+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @thread_no . increment } " )
179
184
@client_threads << Thread . current
180
185
client . run unless @closed . value
181
186
end
@@ -196,9 +201,9 @@ def run_as_client
196
201
begin
197
202
client_socket = connect unless client_socket
198
203
199
- writable_oi = nil
200
- while writable_oi . nil? || writable_oi . any? == false
201
- readable_io , writable_oi , _ = IO . select ( [ client_socket ] , [ client_socket ] )
204
+ writable_io = nil
205
+ while writable_io . nil? || writable_io . any? == false
206
+ readable_io , writable_io , _ = IO . select ( [ client_socket ] , [ client_socket ] )
202
207
203
208
# don't expect any reads, but a readable socket might
204
209
# mean the remote end closed, so read it and throw it away.
0 commit comments