From 74e76bd6c425fd42b28f163f8820e4547309c783 Mon Sep 17 00:00:00 2001 From: Vladislav Rusin Date: Mon, 26 Nov 2018 10:46:09 +0300 Subject: [PATCH 1/6] Added zookeeper as sql_last_value storage --- .gitignore | 1 + lib/logstash/inputs/jdbc.rb | 41 ++++- lib/logstash/plugin_mixins/jdbc/jdbc.rb | 1 + .../jdbc/value_tracking_zookeeper.rb | 149 ++++++++++++++++++ logstash-input-jdbc.gemspec | 1 + 5 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb diff --git a/.gitignore b/.gitignore index 6f6ae30..20a4c25 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ Gemfile.lock .bundle vendor derby.log +.idea \ No newline at end of file diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index aa47c0f..1ef81a9 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -157,9 +157,21 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # last run time storage ('file', 'zookeeper') + config :last_run_storage, :validate => :string, :default => "file" + # Path to file with last run time config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + # Path to zookeeper node with last run time + config :last_run_zookeeper_path, :validate => :string, :default => "/logstash_input_jdbc_last_run" + + # Zookeeper ip list + config :zk_ip_list, :validate => :string, :default => "localhost:2181" + + # Znode we created is permanent or ephemeral. + config :zk_ephemeral, :validate => :boolean, :default => false + # Use an incremental column value rather than a timestamp config :use_column_value, :validate => :boolean, :default => false @@ -213,7 +225,8 @@ def register end end - set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + init_value_tracker + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? @@ -242,6 +255,16 @@ def register end end # def register + def init_value_tracker + if @last_run_storage.downcase == "file" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + else + if @last_run_storage.downcase == "zookeeper" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTrackingZookeeper.build_last_value_tracker(self)) + end + end + end + # test injection points def set_statement_logger(instance) @statement_logger = instance @@ -273,7 +296,13 @@ def stop def execute_query(queue) # update default parameters - @parameters['sql_last_value'] = @value_tracker.value + if @last_run_storage.downcase == "file" + @parameters['sql_last_value'] = @value_tracker.value + else + if @last_run_storage.downcase == "zookeeper" + @parameters['sql_last_value'] = @value_tracker.read_value + end + end execute_statement(@statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements @@ -283,7 +312,13 @@ def execute_query(queue) decorate(event) queue << event end - @value_tracker.write + begin + # save value if it's not the same as previous + @value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value + rescue => e + @logger.error("Failed to write last value", :exception => e) + stop + end end private diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index da524ee..cfb4bb8 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -4,6 +4,7 @@ require "time" require "date" require_relative "value_tracking" +require_relative "value_tracking_zookeeper" require_relative "checked_count_logger" java_import java.util.concurrent.locks.ReentrantLock diff --git a/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb new file mode 100644 index 0000000..eaa0d23 --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb @@ -0,0 +1,149 @@ +# encoding: utf-8 +require "zk" + +module LogStash module PluginMixins module Jdbc + class ValueTrackingZookeeper + + def self.build_last_value_tracker(plugin) + if plugin.use_column_value && plugin.tracking_column_type == "numeric" + # use this irrespective of the jdbc_default_timezone setting + klass = NumericValueTrackerZK + else + if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty? + # no TZ stuff for Sequel, use Time + klass = TimeValueTrackerZK + else + # Sequel does timezone handling on DateTime only + klass = DateTimeValueTrackerZK + end + end + + handler = NullNodeHandler.new(plugin.last_run_zookeeper_path) + if plugin.record_last_run + handler = NodeHandler.new(plugin) + end + if plugin.clean_run + handler.clean + end + instance = klass.new(handler) + return instance + end + + attr_reader :value + + def initialize(handler) + @node_handler = handler + set_value(read_value) + end + + def read_value + # override in subclass + end + + def set_value(value) + # override in subclass + end + + def write + @node_handler.write(@value) + end + end + + + class NumericValueTrackerZK < ValueTrackingZookeeper + def read_value + @val = @node_handler.read + return 0 if @val.nil? + @val.to_i + end + + def set_value(value) + return unless value.is_a?(Numeric) + @value = value + end + end + + class DateTimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || DateTime.new(1970) + end + + def set_value(value) + if value.respond_to?(:to_datetime) + @value = value.to_datetime + else + @value = DateTime.parse(value) + end + end + end + + class TimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || Time.at(0).utc + end + + def set_value(value) + if value.respond_to?(:to_time) + @value = value.to_time + else + @value = DateTime.parse(value).to_time + end + end + end + + class NodeHandler + def initialize(plugin) + @path = plugin.last_run_zookeeper_path + @zk_ip_list = plugin.zk_ip_list + @zk_ephemeral = plugin.zk_ephemeral + + @zk = ZK.new(@zk_ip_list) + @exists = @zk.exists?(@path) + create_node + end + + def clean + return unless @exists + @zk.delete(@path) + @exists = false + end + + def read + return unless @exists + @zk.get(@path).first + end + + def set_initial(initial) + @initial = initial + end + + def create_node + unless @exists + if @zk_ephemeral + @zk.create(@path, :ephemeral => true) + else + @zk.create(@path) + end + @exists = true + end + end + + def write(value) + @zk.set(@path, value.to_s) + end + end + + class NullNodeHandler + def initialize(path) + end + + def clean + end + + def read + end + + def write(value) + end + end +end end end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 6d3a13e..e544484 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'tzinfo' s.add_runtime_dependency 'tzinfo-data' s.add_runtime_dependency 'rufus-scheduler' + s.add_runtime_dependency "zk", ">= 1.9.6" s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'timecop' From c00ba66d44872abfa57a35c4d408c00fed8cbd88 Mon Sep 17 00:00:00 2001 From: Vladislav Rusin Date: Mon, 26 Nov 2018 10:52:40 +0300 Subject: [PATCH 2/6] Changed readme. Added zookeeper config to example --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index fefad1c..040c0fc 100755 --- a/README.md +++ b/README.md @@ -97,6 +97,13 @@ Reading data from MySQL: jdbc_password => "password" # or jdbc_password_filepath => "/path/to/my/password_file" statement => "SELECT ..." + use_column_value => true + tracking_column => tracking_number + last_run_storage => "zookeeper" + # or last_run_storage => "file" + # last_run_metadata_path => "/path/to/last_run_metadata_path" + last_run_zookeeper_path => "/last_run_zookeeper_path" + zk_ip_list => "zookeeper_host:zookeeper_port" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } From cc8e9d528102efdf387ea4d46226159b93eec661 Mon Sep 17 00:00:00 2001 From: Vladislav Rusin Date: Fri, 30 Nov 2018 12:16:51 +0300 Subject: [PATCH 3/6] Fixed number format errors --- lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb index eaa0d23..b4e317b 100644 --- a/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb +++ b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb @@ -54,7 +54,7 @@ class NumericValueTrackerZK < ValueTrackingZookeeper def read_value @val = @node_handler.read return 0 if @val.nil? - @val.to_i + @val.to_f.round(0) end def set_value(value) From db0d9e33f0144dc22310e280972a9001af48cd40 Mon Sep 17 00:00:00 2001 From: Vladislav Rusin Date: Fri, 30 Nov 2018 12:55:26 +0300 Subject: [PATCH 4/6] Removed .idea from .gitignore --- .gitignore | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 20a4c25..37a849c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,4 @@ Gemfile.lock .bundle vendor -derby.log -.idea \ No newline at end of file +derby.log \ No newline at end of file From 5dd6d947f30391e0e8f46fe11519c8d1883cebc2 Mon Sep 17 00:00:00 2001 From: VladislavRusin Date: Fri, 30 Nov 2018 15:08:40 +0300 Subject: [PATCH 5/6] Added zookeeper as sql_last_value storage --- README.md | 7 + lib/logstash/inputs/jdbc.rb | 41 ++++- lib/logstash/plugin_mixins/jdbc/jdbc.rb | 1 + .../jdbc/value_tracking_zookeeper.rb | 149 ++++++++++++++++++ logstash-input-jdbc.gemspec | 1 + 5 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb diff --git a/README.md b/README.md index fefad1c..040c0fc 100755 --- a/README.md +++ b/README.md @@ -97,6 +97,13 @@ Reading data from MySQL: jdbc_password => "password" # or jdbc_password_filepath => "/path/to/my/password_file" statement => "SELECT ..." + use_column_value => true + tracking_column => tracking_number + last_run_storage => "zookeeper" + # or last_run_storage => "file" + # last_run_metadata_path => "/path/to/last_run_metadata_path" + last_run_zookeeper_path => "/last_run_zookeeper_path" + zk_ip_list => "zookeeper_host:zookeeper_port" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index aa47c0f..1ef81a9 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -157,9 +157,21 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # last run time storage ('file', 'zookeeper') + config :last_run_storage, :validate => :string, :default => "file" + # Path to file with last run time config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + # Path to zookeeper node with last run time + config :last_run_zookeeper_path, :validate => :string, :default => "/logstash_input_jdbc_last_run" + + # Zookeeper ip list + config :zk_ip_list, :validate => :string, :default => "localhost:2181" + + # Znode we created is permanent or ephemeral. + config :zk_ephemeral, :validate => :boolean, :default => false + # Use an incremental column value rather than a timestamp config :use_column_value, :validate => :boolean, :default => false @@ -213,7 +225,8 @@ def register end end - set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + init_value_tracker + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? @@ -242,6 +255,16 @@ def register end end # def register + def init_value_tracker + if @last_run_storage.downcase == "file" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + else + if @last_run_storage.downcase == "zookeeper" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTrackingZookeeper.build_last_value_tracker(self)) + end + end + end + # test injection points def set_statement_logger(instance) @statement_logger = instance @@ -273,7 +296,13 @@ def stop def execute_query(queue) # update default parameters - @parameters['sql_last_value'] = @value_tracker.value + if @last_run_storage.downcase == "file" + @parameters['sql_last_value'] = @value_tracker.value + else + if @last_run_storage.downcase == "zookeeper" + @parameters['sql_last_value'] = @value_tracker.read_value + end + end execute_statement(@statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements @@ -283,7 +312,13 @@ def execute_query(queue) decorate(event) queue << event end - @value_tracker.write + begin + # save value if it's not the same as previous + @value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value + rescue => e + @logger.error("Failed to write last value", :exception => e) + stop + end end private diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index da524ee..cfb4bb8 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -4,6 +4,7 @@ require "time" require "date" require_relative "value_tracking" +require_relative "value_tracking_zookeeper" require_relative "checked_count_logger" java_import java.util.concurrent.locks.ReentrantLock diff --git a/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb new file mode 100644 index 0000000..b4e317b --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb @@ -0,0 +1,149 @@ +# encoding: utf-8 +require "zk" + +module LogStash module PluginMixins module Jdbc + class ValueTrackingZookeeper + + def self.build_last_value_tracker(plugin) + if plugin.use_column_value && plugin.tracking_column_type == "numeric" + # use this irrespective of the jdbc_default_timezone setting + klass = NumericValueTrackerZK + else + if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty? + # no TZ stuff for Sequel, use Time + klass = TimeValueTrackerZK + else + # Sequel does timezone handling on DateTime only + klass = DateTimeValueTrackerZK + end + end + + handler = NullNodeHandler.new(plugin.last_run_zookeeper_path) + if plugin.record_last_run + handler = NodeHandler.new(plugin) + end + if plugin.clean_run + handler.clean + end + instance = klass.new(handler) + return instance + end + + attr_reader :value + + def initialize(handler) + @node_handler = handler + set_value(read_value) + end + + def read_value + # override in subclass + end + + def set_value(value) + # override in subclass + end + + def write + @node_handler.write(@value) + end + end + + + class NumericValueTrackerZK < ValueTrackingZookeeper + def read_value + @val = @node_handler.read + return 0 if @val.nil? + @val.to_f.round(0) + end + + def set_value(value) + return unless value.is_a?(Numeric) + @value = value + end + end + + class DateTimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || DateTime.new(1970) + end + + def set_value(value) + if value.respond_to?(:to_datetime) + @value = value.to_datetime + else + @value = DateTime.parse(value) + end + end + end + + class TimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || Time.at(0).utc + end + + def set_value(value) + if value.respond_to?(:to_time) + @value = value.to_time + else + @value = DateTime.parse(value).to_time + end + end + end + + class NodeHandler + def initialize(plugin) + @path = plugin.last_run_zookeeper_path + @zk_ip_list = plugin.zk_ip_list + @zk_ephemeral = plugin.zk_ephemeral + + @zk = ZK.new(@zk_ip_list) + @exists = @zk.exists?(@path) + create_node + end + + def clean + return unless @exists + @zk.delete(@path) + @exists = false + end + + def read + return unless @exists + @zk.get(@path).first + end + + def set_initial(initial) + @initial = initial + end + + def create_node + unless @exists + if @zk_ephemeral + @zk.create(@path, :ephemeral => true) + else + @zk.create(@path) + end + @exists = true + end + end + + def write(value) + @zk.set(@path, value.to_s) + end + end + + class NullNodeHandler + def initialize(path) + end + + def clean + end + + def read + end + + def write(value) + end + end +end end end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 6d3a13e..e544484 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'tzinfo' s.add_runtime_dependency 'tzinfo-data' s.add_runtime_dependency 'rufus-scheduler' + s.add_runtime_dependency "zk", ">= 1.9.6" s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'timecop' From 92a9388fdf27f4cfec3946b7394dc10d7df24709 Mon Sep 17 00:00:00 2001 From: Vladislav Rusin Date: Tue, 29 Jan 2019 11:41:04 +0300 Subject: [PATCH 6/6] Don't change sql_last_value if no one event sent --- lib/logstash/inputs/jdbc.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 1ef81a9..487b86d 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -303,6 +303,7 @@ def execute_query(queue) @parameters['sql_last_value'] = @value_tracker.read_value end end + @event_sent = false execute_statement(@statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements @@ -311,10 +312,11 @@ def execute_query(queue) event = LogStash::Event.new(row) decorate(event) queue << event + @event_sent = true end begin # save value if it's not the same as previous - @value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value + @value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value && @event_sent rescue => e @logger.error("Failed to write last value", :exception => e) stop