diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f6ebb..cc2f5f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 6.0.3 + - Pulled applicable back-ports from 6.1.0 [#50](https://github.com/logstash-plugins/logstash-output-tcp/pull/50) + - Fix: Ensure sockets are closed when this plugin is closed + - Fix: Fixes an issue in client mode where payloads larger than a connection's current TCP window could be silently truncated + - Fix: Fixes an issue in server mode where payloads larger than a connection's current TCP window could be silently truncated + ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 3b06465..5a19d57 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -51,31 +51,44 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil + ## + # @param socket [Socket] + # @param logger_context [#log_warn&#log_error&#logger] class Client - public - def initialize(socket, logger) + def initialize(socket, logger_context) @socket = socket - @logger = logger + @peer_info = socket.peer + @logger_context = logger_context @queue = Queue.new end - public def run loop do begin - @socket.write(@queue.pop) + payload = @queue.pop + + @logger_context.logger.trace("transmitting #{payload.bytesize} bytes", socket: @peer_info) if @logger_context.logger.trace? && payload && !payload.empty? + while payload && !payload.empty? + written_bytes_size = @socket.write(payload) + payload = payload.byteslice(written_bytes_size..-1) + @logger_context.logger.log_trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @peer_info) if @logger_context.logger.trace? + end rescue => e - @logger.warn("tcp output exception", :socket => @socket, - :exception => e) + @logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @peer_info) break end end end # def run - public def write(msg) @queue.push(msg) end # def write + + def close + @socket.close + rescue => e + @logger_context.log_warn 'socket close failed:', e, socket: @socket&.to_s + end end # class Client private @@ -113,6 +126,8 @@ def register if @ssl_enable setup_ssl end # @ssl_enable + @closed = Concurrent::AtomicBoolean.new(false) + @thread_no = Concurrent::AtomicFixnum.new(0) if server? @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") @@ -129,44 +144,61 @@ def register @client_threads = [] @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") loop do + break if @closed.value Thread.start(server_socket.accept) do |client_socket| # monkeypatch a 'peer' method onto the socket. - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("Accepted connection", :client => client_socket.peer, :server => "#{@host}:#{@port}") - client = Client.new(client_socket, @logger) + client = Client.new(client_socket, self) Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}") @client_threads << Thread.current - client.run + client.run unless @closed.value end end end @codec.on_event do |event, payload| + @client_threads.select!(&:alive?) @client_threads.each do |client_thread| client_thread[:client].write(payload) end - @client_threads.reject! {|t| !t.alive? } end else - client_socket = nil + @client_socket = nil + peer_info = nil @codec.on_event do |event, payload| begin - client_socket = connect unless client_socket - r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? + # not threadsafe; this is why we require `concurrency: single` + unless @client_socket + @client_socket = connect + peer_info = @client_socket.peer + end + + writable_io = nil + while writable_io.nil? || writable_io.any? == false + readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket]) + + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + readable_io.each { |readable| readable.sysread(16384) } + end # Now send the payload - client_socket.syswrite(payload) if w.any? + @logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty? + while payload && payload.bytesize > 0 + written_bytes_size = @client_socket.syswrite(payload) + payload = payload.byteslice(written_bytes_size..-1) + @logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace? + end rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, - :exception => e, :backtrace => e.backtrace) - client_socket.close rescue nil - client_socket = nil + log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info + @client_socket.close rescue nil + @client_socket = nil sleep @reconnect_interval retry end @@ -174,6 +206,23 @@ def register end end # def register + # @overload Base#close + def close + if server? + # server-mode clean-up + @closed.make_true + @server_socket.shutdown rescue nil if @server_socket + + @client_threads&.each do |thread| + client = thread[:client] + client.close rescue nil if client + end + else + # client-mode clean-up + @client_socket&.close + end + end + private def connect begin @@ -183,17 +232,17 @@ def connect begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) + log_error 'connect ssl failure:', ssle, backtrace: false # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) raise end end - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("Opened connection", :client => "#{client_socket.peer}") return client_socket rescue StandardError => e - @logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace) + log_error 'failed to connect:', e sleep @reconnect_interval retry end @@ -208,4 +257,20 @@ def server? def receive(event) @codec.encode(event) end # def receive + + def pipeline_id + execution_context.pipeline_id || 'main' + end + + def log_warn(msg, e, backtrace: @logger.debug?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.error(msg, details) + end end # class LogStash::Outputs::Tcp diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index b65fe8d..3b330c9 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.0.2' + s.version = '6.0.3' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index d5494b8..9eb2dc4 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -3,7 +3,7 @@ require "flores/pki" describe LogStash::Outputs::Tcp do - subject { described_class.new(config) } + subject(:instance) { described_class.new(config) } let(:config) { { "host" => "localhost", "port" => 2000 + rand(3000), @@ -73,4 +73,139 @@ end end end + + ## + # Reads `in_io` until EOF and writes the bytes + # it receives to `out_io`, tolerating partial writes. + def siphon_until_eof(in_io, out_io) + buffer = "" + while (retval = in_io.read_nonblock(32*1024, buffer, exception:false)) do + (IO.select([in_io], nil, nil, 5); next) if retval == :wait_readable + + while (buffer && !buffer.empty?) do + bytes_written = out_io.write(buffer) + buffer.replace buffer.byteslice(bytes_written..-1) + end + end + end + + context 'client mode' do + context 'transmitting data' do + let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives + + let(:server_host) { 'localhost' } + let(:server_port) { server.addr[1] } # get actual since we bind to port 0 + + let!(:server) { TCPServer.new(server_host, 0) } + + let(:config) do + { 'host' => server_host, 'port' => server_port, 'mode' => 'client' } + end + + let(:event) { LogStash::Event.new({"hello" => "world"})} + + subject(:instance) { described_class.new(config) } + + before(:each) do + # accepts ONE connection + @server_socket_thread = Thread.start do + client = server.accept + siphon_until_eof(client, io) + end + instance.register + end + + after(:each) do + @server_socket_thread&.join + end + + it 'encodes and transmits data' do + instance.receive(event) + sleep 1 + instance.close # release the connection + @server_socket_thread.join(30) || fail('server failed to join') + expect(io.string).to include('"hello"','"world"') + end + + context 'when payload is very large' do + let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 } + let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) } + + + it 'encodes and transmits data' do + instance.receive(event) + sleep 1 + instance.close # release the connection + @server_socket_thread.join(30) || fail('server failed to join') + expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}")) + end + end + end + end + + context 'server mode' do + + def wait_for_condition(total_time_in_seconds, &block) + deadline = Time.now + total_time_in_seconds + until Time.now > deadline + return if yield + sleep(1) + end + fail('condition not met!') + end + + context 'transmitting data' do + let(:server_host) { 'localhost' } + let(:server_port) { Random.rand(1024...5000) } + + let(:config) do + { 'host' => server_host, 'port' => server_port, 'mode' => 'server' } + end + + subject(:instance) { described_class.new(config) } + + before(:each) { instance.register } # start listener + after(:each) { instance.close } + + let(:event) { LogStash::Event.new({"hello" => "world"})} + + context 'when one client is connected' do + let(:io) { StringIO.new } + let(:client_socket) { TCPSocket.new(server_host, server_port) } + + before(:each) do + @client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) } + sleep 1 # wait for it to actually connect + end + + it 'encodes and transmits data' do + sleep 1 + instance.receive(event) + + wait_for_condition(30) { !io.size.zero? } + sleep 1 # wait for the event to get sent... + instance.close # release the connection + + @client_socket_thread.join(30) || fail('client failed to join') + expect(io.string).to include('"hello"','"world"') + end + + context 'when payload is very large' do + let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 } + let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) } + + it 'encodes and transmits data' do + instance.receive(event) + + wait_for_condition(30) { io.size >= one_hundred_megabyte_message.size } + sleep 1 # wait for the event to get sent... + instance.close # release the connection + + @client_socket_thread.join(30) || fail('client failed to join') + expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}")) + end + end + end + end + end end