From 49b1c2f80607b2affa9461b95c8e32fc76c2a6e4 Mon Sep 17 00:00:00 2001 From: Maciej Palinski Date: Wed, 19 Feb 2020 14:49:18 +0000 Subject: [PATCH] Fix saving pipeline state using record_last_run setting --- lib/logstash/plugin_mixins/jdbc/jdbc.rb | 2 +- spec/inputs/jdbc_spec.rb | 44 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index 341cd58..923359a 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -252,7 +252,7 @@ def execute_statement rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError, Java::JavaSql::SQLException => e @logger.warn("Exception when executing JDBC query", :exception => e) else - @value_tracker.set_value(sql_last_value) + @value_tracker.set_value(sql_last_value) if @record_last_run ensure close_jdbc_connection @connection_lock.unlock diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 02597e0..df6d5fb 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -898,6 +898,50 @@ end end + context "when state is not to be persisted iteratively" do + let(:mixin_settings) do + { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true" + } + end + + let(:settings) do + { + "statement" => "SELECT num, created_at, custom_time FROM test_table WHERE custom_time > :sql_last_value", + "record_last_run" => false, + "clean_run" => true + } + end + + let(:nums) { [10, 20, 30, 40, 50] } + let(:times) {["2015-05-06 13:14:15","2015-05-07 13:14:15","2015-05-08 13:14:15","2015-05-09 13:14:15","2015-05-10 13:14:15"]} + + before do + plugin.register + end + + after do + plugin.stop + end + + it "should successfully always start from the very beginning" do + test_table = db[:test_table] + + plugin.run(queue) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000")) + test_table.insert(:num => nums[0], :created_at => Time.now.utc, :custom_time => times[0]) + test_table.insert(:num => nums[1], :created_at => Time.now.utc, :custom_time => times[1]) + + plugin.run(queue) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000")) + test_table.insert(:num => nums[2], :created_at => Time.now.utc, :custom_time => times[2]) + test_table.insert(:num => nums[3], :created_at => Time.now.utc, :custom_time => times[3]) + test_table.insert(:num => nums[4], :created_at => Time.now.utc, :custom_time => times[4]) + + plugin.run(queue) + expect(plugin.instance_variable_get("@value_tracker").value).to eq(Time.parse("1970-01-01 00:00:00.000000000 +0000")) + end + end context "when state is not to be persisted" do let(:settings) do