From ed23c4e6be4941c7e2121fd8046678cc327b9a89 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 17:31:25 +0000 Subject: [PATCH 01/10] backport client-closing and logging standardization from 6.1.0 --- CHANGELOG.md | 4 +++ lib/logstash/outputs/tcp.rb | 65 +++++++++++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f6ebb..ae5e53b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 6.0.3 + - Pulled applicable back-ports from 6.1.0 + - Fix: Ensure sockets are closed when this plugin is closed + ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 3b06465..81705c1 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -51,31 +51,36 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil + ## + # @param socket [Socket] + # @param logger_context [#log_warn&#log_error] class Client - public - def initialize(socket, logger) + def initialize(socket, logger_context) @socket = socket - @logger = logger + @logger_context = logger_context @queue = Queue.new end - public def run loop do begin @socket.write(@queue.pop) rescue => e - @logger.warn("tcp output exception", :socket => @socket, - :exception => e) + @logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @socket&.to_s) break end end end # def run - public def write(msg) @queue.push(msg) end # def write + + def close + @socket.close + rescue => e + @logger_context.log_warn 'socket close failed:', e, socket: @socket&.to_s + end end # class Client private @@ -113,6 +118,8 @@ def register if @ssl_enable setup_ssl end # @ssl_enable + @closed = Concurrent::AtomicBoolean.new(false) + @thread_no = Concurrent::AtomicFixnum.new(0) if server? @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") @@ -129,25 +136,28 @@ def register @client_threads = [] @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") loop do + break if @closed.value Thread.start(server_socket.accept) do |client_socket| # monkeypatch a 'peer' method onto the socket. client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Accepted connection", :client => client_socket.peer, :server => "#{@host}:#{@port}") - client = Client.new(client_socket, @logger) + client = Client.new(client_socket, self) Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}") @client_threads << Thread.current - client.run + client.run unless @closed.value end end end @codec.on_event do |event, payload| + @client_threads.select!(&:alive?) @client_threads.each do |client_thread| client_thread[:client].write(payload) end - @client_threads.reject! {|t| !t.alive? } end else client_socket = nil @@ -163,8 +173,7 @@ def register # Now send the payload client_socket.syswrite(payload) if w.any? rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, - :exception => e, :backtrace => e.backtrace) + log_warn "client socket failed:", e, host: @host, port: @port, socket: client_socket&.to_s client_socket.close rescue nil client_socket = nil sleep @reconnect_interval @@ -174,6 +183,18 @@ def register end end # def register + # @overload Base#close + def close + @closed.make_true + @server_socket.close rescue nil if @server_socket + + return unless @client_threads + @client_threads.each do |thread| + client = thread[:client] + client.close rescue nil if client + end + end + private def connect begin @@ -183,7 +204,7 @@ def connect begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) + log_error 'connect ssl failure:', ssle, backtrace: false # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) raise @@ -193,7 +214,7 @@ def connect @logger.debug("Opened connection", :client => "#{client_socket.peer}") return client_socket rescue StandardError => e - @logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace) + log_error 'failed to connect:', e sleep @reconnect_interval retry end @@ -208,4 +229,20 @@ def server? def receive(event) @codec.encode(event) end # def receive + + def pipeline_id + execution_context.pipeline_id || 'main' + end + + def log_warn(msg, e, backtrace: @logger.debug?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.error(msg, details) + end end # class LogStash::Outputs::Tcp From c8bf2c3dfc4dd2372ff132911d04c690754f9b5d Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 17:40:25 +0000 Subject: [PATCH 02/10] backport overlarge-payload fixes from 6.1.1 --- CHANGELOG.md | 1 + lib/logstash/outputs/tcp.rb | 24 ++++++++++++++++-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae5e53b..563ff38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 6.0.3 - Pulled applicable back-ports from 6.1.0 - Fix: Ensure sockets are closed when this plugin is closed + - Fix: Fixes an issue where payloads larger than a connection's current TCP window could be silently truncated ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 81705c1..ca422ba 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -141,7 +141,7 @@ def register break if @closed.value Thread.start(server_socket.accept) do |client_socket| # monkeypatch a 'peer' method onto the socket. - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("Accepted connection", :client => client_socket.peer, :server => "#{@host}:#{@port}") client = Client.new(client_socket, self) @@ -164,14 +164,22 @@ def register @codec.on_event do |event, payload| begin client_socket = connect unless client_socket - r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? + + writable_io = nil + while writable_io.nil? || writable_io.any? == false + readable_io, writable_io, _ = IO.select([client_socket],[client_socket]) + + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + readable_io.each { |readable| readable.sysread(16384) } + end # Now send the payload - client_socket.syswrite(payload) if w.any? + while payload && payload.bytesize > 0 + written_bytes_size = client_socket.syswrite(payload) + payload = payload.byteslice(written_bytes_size..-1) + end rescue => e log_warn "client socket failed:", e, host: @host, port: @port, socket: client_socket&.to_s client_socket.close rescue nil @@ -210,7 +218,7 @@ def connect raise end end - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("Opened connection", :client => "#{client_socket.peer}") return client_socket rescue StandardError => e From 874beb8208530790e0630f32bc4b639a2f49662c Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 22:08:46 +0000 Subject: [PATCH 03/10] add trace logging for overlarge payloads --- lib/logstash/outputs/tcp.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index ca422ba..9fa6280 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -161,9 +161,14 @@ def register end else client_socket = nil + peer_info = nil @codec.on_event do |event, payload| begin - client_socket = connect unless client_socket + # not threadsafe; this is why we require `concurrency: single` + unless client_socket + client_socket = connect + peer_info = client_socket.peer + end writable_io = nil while writable_io.nil? || writable_io.any? == false @@ -176,12 +181,14 @@ def register end # Now send the payload + @logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty? while payload && payload.bytesize > 0 written_bytes_size = client_socket.syswrite(payload) payload = payload.byteslice(written_bytes_size..-1) + @logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace? end rescue => e - log_warn "client socket failed:", e, host: @host, port: @port, socket: client_socket&.to_s + log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info client_socket.close rescue nil client_socket = nil sleep @reconnect_interval From 29fb38e99a73329fbc12e2ac4c5895fe1475fd22 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 22:43:30 +0000 Subject: [PATCH 04/10] fixup: add 100ms backoff for client-mode writes --- lib/logstash/outputs/tcp.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 9fa6280..a843e4b 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -186,6 +186,7 @@ def register written_bytes_size = client_socket.syswrite(payload) payload = payload.byteslice(written_bytes_size..-1) @logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace? + sleep 0.1 unless payload.empty? end rescue => e log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info From 72d5072c73a94049023c9848f1d914362cd1249f Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 23:07:40 +0000 Subject: [PATCH 05/10] cross-port previous client-mode TCP window fixes to server-mode --- lib/logstash/outputs/tcp.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index a843e4b..0b4409f 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -53,10 +53,11 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base ## # @param socket [Socket] - # @param logger_context [#log_warn&#log_error] + # @param logger_context [#log_warn&#log_error&#logger] class Client def initialize(socket, logger_context) @socket = socket + @peer_info = socket.peer @logger_context = logger_context @queue = Queue.new end @@ -64,9 +65,17 @@ def initialize(socket, logger_context) def run loop do begin - @socket.write(@queue.pop) + payload = @queue.pop + + @logger_context.logger.trace("transmitting #{payload.bytesize} bytes", socket: @peer_info) if @logger_context.logger.trace? && payload && !payload.empty? + while payload && !payload.empty? + written_bytes_size = @socket.write(payload) + payload = payload.byteslice(written_bytes_size..-1) + @logger_context.logger.log_trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @peer_info) if @logger_context.logger.trace? + sleep 0.1 unless payload.empty? + end rescue => e - @logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @socket&.to_s) + @logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @peer_info) break end end From cae62349e7f56ce7c304ce6584d042c46d16de43 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 17:41:22 +0000 Subject: [PATCH 06/10] bump to 6.0.3 --- logstash-output-tcp.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index b65fe8d..3b330c9 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.0.2' + s.version = '6.0.3' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 6c5aa472f5dbc4a9eb87c3aadec2143b6bb223d0 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 22:07:53 +0000 Subject: [PATCH 07/10] fix server-mode shutdown bug, add baseline specs --- lib/logstash/outputs/tcp.rb | 33 +++++----- spec/outputs/tcp_spec.rb | 119 +++++++++++++++++++++++++++++++++++- 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 0b4409f..e768b1b 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -169,19 +169,19 @@ def register end end else - client_socket = nil + @client_socket = nil peer_info = nil @codec.on_event do |event, payload| begin # not threadsafe; this is why we require `concurrency: single` - unless client_socket - client_socket = connect - peer_info = client_socket.peer + unless @client_socket + @client_socket = connect + peer_info = @client_socket.peer end writable_io = nil while writable_io.nil? || writable_io.any? == false - readable_io, writable_io, _ = IO.select([client_socket],[client_socket]) + readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket]) # don't expect any reads, but a readable socket might # mean the remote end closed, so read it and throw it away. @@ -192,15 +192,15 @@ def register # Now send the payload @logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty? while payload && payload.bytesize > 0 - written_bytes_size = client_socket.syswrite(payload) + written_bytes_size = @client_socket.syswrite(payload) payload = payload.byteslice(written_bytes_size..-1) @logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace? sleep 0.1 unless payload.empty? end rescue => e log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info - client_socket.close rescue nil - client_socket = nil + @client_socket.close rescue nil + @client_socket = nil sleep @reconnect_interval retry end @@ -210,13 +210,18 @@ def register # @overload Base#close def close - @closed.make_true - @server_socket.close rescue nil if @server_socket + if server? + # server-mode clean-up + @closed.make_true + @server_socket.shutdown rescue nil if @server_socket - return unless @client_threads - @client_threads.each do |thread| - client = thread[:client] - client.close rescue nil if client + @client_threads&.each do |thread| + client = thread[:client] + client.close rescue nil if client + end + else + # client-mode clean-up + @client_socket&.close end end diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index d5494b8..56e26b8 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -3,7 +3,7 @@ require "flores/pki" describe LogStash::Outputs::Tcp do - subject { described_class.new(config) } + subject(:instance) { described_class.new(config) } let(:config) { { "host" => "localhost", "port" => 2000 + rand(3000), @@ -73,4 +73,121 @@ end end end + + ## + # Reads `in_io` until EOF and writes the bytes + # it receives to `out_io`, tolerating partial writes. + def siphon_until_eof(in_io, out_io) + while (buffer = in_io.read(16*1024)) do + while (buffer && !buffer.empty?) do + bytes_written = out_io.write(buffer) + buffer = buffer.byteslice(bytes_written..-1) + end + end + end + + context 'client mode' do + context 'transmitting data' do + let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives + + let(:server_host) { 'localhost' } + let(:server_port) { server.addr[1] } # get actual since we bind to port 0 + + let!(:server) { TCPServer.new(server_host, 0) } + + let(:config) do + { 'host' => server_host, 'port' => server_port, 'mode' => 'client' } + end + + let(:event) { LogStash::Event.new({"hello" => "world"})} + + subject(:instance) { described_class.new(config) } + + before(:each) do + # accepts ONE connection + @server_socket_thread = Thread.start do + client = server.accept + siphon_until_eof(client, io) + end + instance.register + end + + after(:each) do + @server_socket_thread&.join + end + + it 'encodes and transmits data' do + instance.receive(event) + instance.close # release the connection + sleep 1 + expect(io.string).to include('"hello"','"world"') + end + + context 'when payload is very large' do + let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 } + let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) } + + + it 'encodes and transmits data' do + instance.receive(event) + instance.close # release the connection + sleep 1 + expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}")) + end + end + end + end + + context 'server mode' do + context 'transmitting data' do + let(:server_host) { 'localhost' } + let(:server_port) { Random.rand(1024...5000) } + + let(:config) do + { 'host' => server_host, 'port' => server_port, 'mode' => 'server' } + end + + subject(:instance) { described_class.new(config) } + + before(:each) { instance.register } # start listener + after(:each) { instance.close } + + let(:event) { LogStash::Event.new({"hello" => "world"})} + + context 'when one client is connected' do + let(:io) { StringIO.new } + let(:client_socket) { TCPSocket.new(server_host, server_port) } + + before(:each) do + @client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) } + sleep 1 # wait for it to actually connect + end + + it 'encodes and transmits data' do + instance.receive(event) + + sleep 1 # wait for the event to get sent... + instance.close # release the connection + + @client_socket_thread.join(30) || fail('client failed to join') + expect(io.string).to include('"hello"','"world"') + end + + context 'when payload is very large' do + let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 } + let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) } + + it 'encodes and transmits data' do + instance.receive(event) + + sleep 1 # wait for the event to get sent... + instance.close # release the connection + + @client_socket_thread.join(30) || fail('client failed to join') + expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}")) + end + end + end + end + end end From 3a6113da7f0dd3a598fdf70ec72b35674cb1cd0c Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Nov 2022 23:41:15 +0000 Subject: [PATCH 08/10] more changelog --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 563ff38..cc2f5f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ## 6.0.3 - - Pulled applicable back-ports from 6.1.0 + - Pulled applicable back-ports from 6.1.0 [#50](https://github.com/logstash-plugins/logstash-output-tcp/pull/50) - Fix: Ensure sockets are closed when this plugin is closed - - Fix: Fixes an issue where payloads larger than a connection's current TCP window could be silently truncated + - Fix: Fixes an issue in client mode where payloads larger than a connection's current TCP window could be silently truncated + - Fix: Fixes an issue in server mode where payloads larger than a connection's current TCP window could be silently truncated ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) From 6c870a258dc844bb27efc28ed95d6c6878abae7d Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Fri, 4 Nov 2022 07:47:54 +0000 Subject: [PATCH 09/10] specs: avoid premature close in server-mode. because server-mode's TCP#receive does not block, we need to wait until the client thread has popped the payload from its queue and sent it over the socket before telling the TCP output to close, or else our close interrupts an in-flight event and our specs get a partial payload. --- spec/outputs/tcp_spec.rb | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index 56e26b8..9eb2dc4 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -78,10 +78,13 @@ # Reads `in_io` until EOF and writes the bytes # it receives to `out_io`, tolerating partial writes. def siphon_until_eof(in_io, out_io) - while (buffer = in_io.read(16*1024)) do + buffer = "" + while (retval = in_io.read_nonblock(32*1024, buffer, exception:false)) do + (IO.select([in_io], nil, nil, 5); next) if retval == :wait_readable + while (buffer && !buffer.empty?) do bytes_written = out_io.write(buffer) - buffer = buffer.byteslice(bytes_written..-1) + buffer.replace buffer.byteslice(bytes_written..-1) end end end @@ -118,8 +121,9 @@ def siphon_until_eof(in_io, out_io) it 'encodes and transmits data' do instance.receive(event) - instance.close # release the connection sleep 1 + instance.close # release the connection + @server_socket_thread.join(30) || fail('server failed to join') expect(io.string).to include('"hello"','"world"') end @@ -130,8 +134,9 @@ def siphon_until_eof(in_io, out_io) it 'encodes and transmits data' do instance.receive(event) - instance.close # release the connection sleep 1 + instance.close # release the connection + @server_socket_thread.join(30) || fail('server failed to join') expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}")) end end @@ -139,6 +144,16 @@ def siphon_until_eof(in_io, out_io) end context 'server mode' do + + def wait_for_condition(total_time_in_seconds, &block) + deadline = Time.now + total_time_in_seconds + until Time.now > deadline + return if yield + sleep(1) + end + fail('condition not met!') + end + context 'transmitting data' do let(:server_host) { 'localhost' } let(:server_port) { Random.rand(1024...5000) } @@ -164,8 +179,10 @@ def siphon_until_eof(in_io, out_io) end it 'encodes and transmits data' do + sleep 1 instance.receive(event) + wait_for_condition(30) { !io.size.zero? } sleep 1 # wait for the event to get sent... instance.close # release the connection @@ -180,6 +197,7 @@ def siphon_until_eof(in_io, out_io) it 'encodes and transmits data' do instance.receive(event) + wait_for_condition(30) { io.size >= one_hundred_megabyte_message.size } sleep 1 # wait for the event to get sent... instance.close # release the connection From 672b6314bd81c125be580908b8623478eed3e5aa Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Fri, 4 Nov 2022 17:13:52 +0000 Subject: [PATCH 10/10] remove 10ms backoff, since IO#write and IO#syswrite block until writable --- lib/logstash/outputs/tcp.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index e768b1b..5a19d57 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -72,7 +72,6 @@ def run written_bytes_size = @socket.write(payload) payload = payload.byteslice(written_bytes_size..-1) @logger_context.logger.log_trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @peer_info) if @logger_context.logger.trace? - sleep 0.1 unless payload.empty? end rescue => e @logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @peer_info) @@ -195,7 +194,6 @@ def register written_bytes_size = @client_socket.syswrite(payload) payload = payload.byteslice(written_bytes_size..-1) @logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace? - sleep 0.1 unless payload.empty? end rescue => e log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info