Skip to content

Commit 379d6ca

Browse files
authored
Merge pull request #50 from yaauie/backport-tcp-window-support-to-6-0-x
Backport TCP window, client closing fixes to 6.0.x
2 parents 2f73981 + 672b631 commit 379d6ca

File tree

4 files changed

+235
-29
lines changed

4 files changed

+235
-29
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 6.0.3
2+
- Pulled applicable back-ports from 6.1.0 [#50](https://github.com/logstash-plugins/logstash-output-tcp/pull/50)
3+
- Fix: Ensure sockets are closed when this plugin is closed
4+
- Fix: Fixes an issue in client mode where payloads larger than a connection's current TCP window could be silently truncated
5+
- Fix: Fixes an issue in server mode where payloads larger than a connection's current TCP window could be silently truncated
6+
17
## 6.0.2
28
- Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45)
39

lib/logstash/outputs/tcp.rb

Lines changed: 92 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -51,31 +51,44 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
5151
# SSL key passphrase
5252
config :ssl_key_passphrase, :validate => :password, :default => nil
5353

54+
##
55+
# @param socket [Socket]
56+
# @param logger_context [#log_warn&#log_error&#logger]
5457
class Client
55-
public
56-
def initialize(socket, logger)
58+
def initialize(socket, logger_context)
5759
@socket = socket
58-
@logger = logger
60+
@peer_info = socket.peer
61+
@logger_context = logger_context
5962
@queue = Queue.new
6063
end
6164

62-
public
6365
def run
6466
loop do
6567
begin
66-
@socket.write(@queue.pop)
68+
payload = @queue.pop
69+
70+
@logger_context.logger.trace("transmitting #{payload.bytesize} bytes", socket: @peer_info) if @logger_context.logger.trace? && payload && !payload.empty?
71+
while payload && !payload.empty?
72+
written_bytes_size = @socket.write(payload)
73+
payload = payload.byteslice(written_bytes_size..-1)
74+
@logger_context.logger.log_trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @peer_info) if @logger_context.logger.trace?
75+
end
6776
rescue => e
68-
@logger.warn("tcp output exception", :socket => @socket,
69-
:exception => e)
77+
@logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @peer_info)
7078
break
7179
end
7280
end
7381
end # def run
7482

75-
public
7683
def write(msg)
7784
@queue.push(msg)
7885
end # def write
86+
87+
def close
88+
@socket.close
89+
rescue => e
90+
@logger_context.log_warn 'socket close failed:', e, socket: @socket&.to_s
91+
end
7992
end # class Client
8093

8194
private
@@ -113,6 +126,8 @@ def register
113126
if @ssl_enable
114127
setup_ssl
115128
end # @ssl_enable
129+
@closed = Concurrent::AtomicBoolean.new(false)
130+
@thread_no = Concurrent::AtomicFixnum.new(0)
116131

117132
if server?
118133
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
@@ -129,51 +144,85 @@ def register
129144
@client_threads = []
130145

131146
@accept_thread = Thread.new(@server_socket) do |server_socket|
147+
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
132148
loop do
149+
break if @closed.value
133150
Thread.start(server_socket.accept) do |client_socket|
134151
# monkeypatch a 'peer' method onto the socket.
135-
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
152+
client_socket.extend(::LogStash::Util::SocketPeer)
136153
@logger.debug("Accepted connection", :client => client_socket.peer,
137154
:server => "#{@host}:#{@port}")
138-
client = Client.new(client_socket, @logger)
155+
client = Client.new(client_socket, self)
139156
Thread.current[:client] = client
157+
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}")
140158
@client_threads << Thread.current
141-
client.run
159+
client.run unless @closed.value
142160
end
143161
end
144162
end
145163

146164
@codec.on_event do |event, payload|
165+
@client_threads.select!(&:alive?)
147166
@client_threads.each do |client_thread|
148167
client_thread[:client].write(payload)
149168
end
150-
@client_threads.reject! {|t| !t.alive? }
151169
end
152170
else
153-
client_socket = nil
171+
@client_socket = nil
172+
peer_info = nil
154173
@codec.on_event do |event, payload|
155174
begin
156-
client_socket = connect unless client_socket
157-
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
158-
# don't expect any reads, but a readable socket might
159-
# mean the remote end closed, so read it and throw it away.
160-
# we'll get an EOFError if it happens.
161-
client_socket.sysread(16384) if r.any?
175+
# not threadsafe; this is why we require `concurrency: single`
176+
unless @client_socket
177+
@client_socket = connect
178+
peer_info = @client_socket.peer
179+
end
180+
181+
writable_io = nil
182+
while writable_io.nil? || writable_io.any? == false
183+
readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket])
184+
185+
# don't expect any reads, but a readable socket might
186+
# mean the remote end closed, so read it and throw it away.
187+
# we'll get an EOFError if it happens.
188+
readable_io.each { |readable| readable.sysread(16384) }
189+
end
162190

163191
# Now send the payload
164-
client_socket.syswrite(payload) if w.any?
192+
@logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty?
193+
while payload && payload.bytesize > 0
194+
written_bytes_size = @client_socket.syswrite(payload)
195+
payload = payload.byteslice(written_bytes_size..-1)
196+
@logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace?
197+
end
165198
rescue => e
166-
@logger.warn("tcp output exception", :host => @host, :port => @port,
167-
:exception => e, :backtrace => e.backtrace)
168-
client_socket.close rescue nil
169-
client_socket = nil
199+
log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info
200+
@client_socket.close rescue nil
201+
@client_socket = nil
170202
sleep @reconnect_interval
171203
retry
172204
end
173205
end
174206
end
175207
end # def register
176208

209+
# @overload Base#close
210+
def close
211+
if server?
212+
# server-mode clean-up
213+
@closed.make_true
214+
@server_socket.shutdown rescue nil if @server_socket
215+
216+
@client_threads&.each do |thread|
217+
client = thread[:client]
218+
client.close rescue nil if client
219+
end
220+
else
221+
# client-mode clean-up
222+
@client_socket&.close
223+
end
224+
end
225+
177226
private
178227
def connect
179228
begin
@@ -183,17 +232,17 @@ def connect
183232
begin
184233
client_socket.connect
185234
rescue OpenSSL::SSL::SSLError => ssle
186-
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
235+
log_error 'connect ssl failure:', ssle, backtrace: false
187236
# NOTE(mrichar1): Hack to prevent hammering peer
188237
sleep(5)
189238
raise
190239
end
191240
end
192-
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
241+
client_socket.extend(::LogStash::Util::SocketPeer)
193242
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
194243
return client_socket
195244
rescue StandardError => e
196-
@logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace)
245+
log_error 'failed to connect:', e
197246
sleep @reconnect_interval
198247
retry
199248
end
@@ -208,4 +257,20 @@ def server?
208257
def receive(event)
209258
@codec.encode(event)
210259
end # def receive
260+
261+
def pipeline_id
262+
execution_context.pipeline_id || 'main'
263+
end
264+
265+
def log_warn(msg, e, backtrace: @logger.debug?, **details)
266+
details = details.merge message: e.message, exception: e.class
267+
details[:backtrace] = e.backtrace if backtrace
268+
@logger.warn(msg, details)
269+
end
270+
271+
def log_error(msg, e, backtrace: @logger.info?, **details)
272+
details = details.merge message: e.message, exception: e.class
273+
details[:backtrace] = e.backtrace if backtrace
274+
@logger.error(msg, details)
275+
end
211276
end # class LogStash::Outputs::Tcp

logstash-output-tcp.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-output-tcp'
4-
s.version = '6.0.2'
4+
s.version = '6.0.3'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Writes events over a TCP socket"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/outputs/tcp_spec.rb

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
require "flores/pki"
44

55
describe LogStash::Outputs::Tcp do
6-
subject { described_class.new(config) }
6+
subject(:instance) { described_class.new(config) }
77
let(:config) { {
88
"host" => "localhost",
99
"port" => 2000 + rand(3000),
@@ -73,4 +73,139 @@
7373
end
7474
end
7575
end
76+
77+
##
78+
# Reads `in_io` until EOF and writes the bytes
79+
# it receives to `out_io`, tolerating partial writes.
80+
def siphon_until_eof(in_io, out_io)
81+
buffer = ""
82+
while (retval = in_io.read_nonblock(32*1024, buffer, exception:false)) do
83+
(IO.select([in_io], nil, nil, 5); next) if retval == :wait_readable
84+
85+
while (buffer && !buffer.empty?) do
86+
bytes_written = out_io.write(buffer)
87+
buffer.replace buffer.byteslice(bytes_written..-1)
88+
end
89+
end
90+
end
91+
92+
context 'client mode' do
93+
context 'transmitting data' do
94+
let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives
95+
96+
let(:server_host) { 'localhost' }
97+
let(:server_port) { server.addr[1] } # get actual since we bind to port 0
98+
99+
let!(:server) { TCPServer.new(server_host, 0) }
100+
101+
let(:config) do
102+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'client' }
103+
end
104+
105+
let(:event) { LogStash::Event.new({"hello" => "world"})}
106+
107+
subject(:instance) { described_class.new(config) }
108+
109+
before(:each) do
110+
# accepts ONE connection
111+
@server_socket_thread = Thread.start do
112+
client = server.accept
113+
siphon_until_eof(client, io)
114+
end
115+
instance.register
116+
end
117+
118+
after(:each) do
119+
@server_socket_thread&.join
120+
end
121+
122+
it 'encodes and transmits data' do
123+
instance.receive(event)
124+
sleep 1
125+
instance.close # release the connection
126+
@server_socket_thread.join(30) || fail('server failed to join')
127+
expect(io.string).to include('"hello"','"world"')
128+
end
129+
130+
context 'when payload is very large' do
131+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
132+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
133+
134+
135+
it 'encodes and transmits data' do
136+
instance.receive(event)
137+
sleep 1
138+
instance.close # release the connection
139+
@server_socket_thread.join(30) || fail('server failed to join')
140+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
141+
end
142+
end
143+
end
144+
end
145+
146+
context 'server mode' do
147+
148+
def wait_for_condition(total_time_in_seconds, &block)
149+
deadline = Time.now + total_time_in_seconds
150+
until Time.now > deadline
151+
return if yield
152+
sleep(1)
153+
end
154+
fail('condition not met!')
155+
end
156+
157+
context 'transmitting data' do
158+
let(:server_host) { 'localhost' }
159+
let(:server_port) { Random.rand(1024...5000) }
160+
161+
let(:config) do
162+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'server' }
163+
end
164+
165+
subject(:instance) { described_class.new(config) }
166+
167+
before(:each) { instance.register } # start listener
168+
after(:each) { instance.close }
169+
170+
let(:event) { LogStash::Event.new({"hello" => "world"})}
171+
172+
context 'when one client is connected' do
173+
let(:io) { StringIO.new }
174+
let(:client_socket) { TCPSocket.new(server_host, server_port) }
175+
176+
before(:each) do
177+
@client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) }
178+
sleep 1 # wait for it to actually connect
179+
end
180+
181+
it 'encodes and transmits data' do
182+
sleep 1
183+
instance.receive(event)
184+
185+
wait_for_condition(30) { !io.size.zero? }
186+
sleep 1 # wait for the event to get sent...
187+
instance.close # release the connection
188+
189+
@client_socket_thread.join(30) || fail('client failed to join')
190+
expect(io.string).to include('"hello"','"world"')
191+
end
192+
193+
context 'when payload is very large' do
194+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
195+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
196+
197+
it 'encodes and transmits data' do
198+
instance.receive(event)
199+
200+
wait_for_condition(30) { io.size >= one_hundred_megabyte_message.size }
201+
sleep 1 # wait for the event to get sent...
202+
instance.close # release the connection
203+
204+
@client_socket_thread.join(30) || fail('client failed to join')
205+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
206+
end
207+
end
208+
end
209+
end
210+
end
76211
end

0 commit comments

Comments
 (0)