Skip to content

Commit 3f257c1

Browse files
committed
Supportt fluent 1.x
1 parent 3979df1 commit 3f257c1

File tree

2 files changed

+79
-79
lines changed

2 files changed

+79
-79
lines changed

fluent-plugin-dynamodb-add.gemspec

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ Gem::Specification.new do |spec|
1717
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
1818
spec.require_paths = ["lib"]
1919

20-
spec.add_development_dependency "bundler", "~> 1.7"
21-
spec.add_development_dependency "rake", "~> 10.0"
22-
spec.add_dependency "fluentd", "~> 0.10.0"
20+
spec.add_development_dependency "bundler"
21+
spec.add_development_dependency "rake"
22+
23+
spec.add_dependency "fluentd", ">= 1", "< 2"
2324
spec.add_dependency "aws-sdk-dynamodb"
2425
end

lib/fluent/plugin/out_dynamodb_add.rb

Lines changed: 75 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,100 @@
11
require 'aws-sdk-dynamodb'
2+
require 'fluent/plugin/output'
3+
4+
class Fluent::Plugin::DynamodbAdd < Fluent::Plugin::Output
5+
Fluent::Plugin.register_output('dynamodb_add', self)
6+
7+
helpers :event_emitter
8+
helpers :compat_parameters
9+
10+
config_param :count_key, :string
11+
config_param :dynamo_count_key, :string
12+
config_param :table_name, :string
13+
config_param :use_iam_role, :bool, :default => false
14+
config_param :aws_key_id, :string, :default => nil
15+
config_param :aws_sec_key, :string, :default => nil
16+
config_param :region, :string, :default => nil
17+
config_param :endpoint, :string, :default => nil
18+
config_param :hash_key, :string, :default => nil
19+
config_param :hash_key_delimiter, :string, :default => ":"
20+
config_param :add_hash_key_prefix, :string, :default => nil
21+
config_param :range_key, :string, :default => nil
22+
config_param :set_timestamp, :string, :default => nil
23+
24+
def initialize
25+
super
26+
end
227

3-
module Fluent
4-
class DynamodbAdd < Fluent::Output
5-
Fluent::Plugin.register_output('dynamodb_add', self)
6-
7-
unless method_defined?(:log)
8-
define_method(:log) { $log }
9-
end
10-
11-
config_param :count_key, :string
12-
config_param :dynamo_count_key, :string
13-
config_param :table_name, :string
14-
config_param :use_iam_role, :bool, :default => false
15-
config_param :aws_key_id, :string, :default => nil
16-
config_param :aws_sec_key, :string, :default => nil
17-
config_param :region, :string, :default => nil
18-
config_param :endpoint, :string, :default => nil
19-
config_param :hash_key, :string, :default => nil
20-
config_param :hash_key_delimiter, :string, :default => ":"
21-
config_param :add_hash_key_prefix, :string, :default => nil
22-
config_param :range_key, :string, :default => nil
23-
config_param :set_timestamp, :string, :default => nil
24-
25-
def initialize
26-
super
27-
end
28+
def configure(conf)
29+
compat_parameters_convert(conf)
2830

29-
def configure(conf)
30-
super
31+
super
3132

32-
unless use_iam_role
33-
[:aws_key_id, :aws_sec_key].each do |name|
34-
unless self.instance_variable_get("@#{name}")
35-
raise ConfigError, "'#{name}' is required"
36-
end
33+
unless use_iam_role
34+
[:aws_key_id, :aws_sec_key].each do |name|
35+
unless self.instance_variable_get("@#{name}")
36+
raise ConfigError, "'#{name}' is required"
3737
end
3838
end
39-
@hash_key = hash_key.split(/\s*,\s*/)
4039
end
40+
@hash_key = hash_key.split(/\s*,\s*/)
41+
end
4142

42-
def start
43-
super
43+
def start
44+
super
4445

45-
options = {}
46+
options = {}
4647

47-
unless use_iam_role
48-
options[:access_key_id] = @aws_key_id
49-
options[:secret_access_key] = @aws_sec_key
50-
end
48+
unless use_iam_role
49+
options[:access_key_id] = @aws_key_id
50+
options[:secret_access_key] = @aws_sec_key
51+
end
5152

52-
options[:region] = region
53-
options[:endpoint] = endpoint
53+
options[:region] = region
54+
options[:endpoint] = endpoint
5455

55-
client = Aws::DynamoDB::Client.new(options)
56+
client = Aws::DynamoDB::Client.new(options)
5657

57-
resource = Aws::DynamoDB::Resource.new(client: client)
58-
@table = resource.table(table_name)
58+
resource = Aws::DynamoDB::Resource.new(client: client)
59+
@table = resource.table(table_name)
5960

60-
@dynamo_hash_key = @table.key_schema.find{|e| e.key_type == "HASH" }.attribute_name
61-
@dynamo_range_key = @table.key_schema.find{|e| e.key_type == "RANGE" }&.attribute_name
62-
end
61+
@dynamo_hash_key = @table.key_schema.find{|e| e.key_type == "HASH" }.attribute_name
62+
@dynamo_range_key = @table.key_schema.find{|e| e.key_type == "RANGE" }&.attribute_name
63+
end
6364

64-
def emit(tag, es, chain)
65-
chain.next
66-
es.each do |time, record|
67-
hash_key = create_key(record)
68-
next unless hash_key || record[@count_key]
65+
def process(tag, es)
66+
es.each do |time, record|
67+
hash_key = create_key(record)
68+
next unless hash_key || record[@count_key]
6969

70-
key = { @dynamo_hash_key => hash_key }
70+
key = { @dynamo_hash_key => hash_key }
7171

72-
if @range_key
73-
next unless record[@range_key]
74-
key[@dynamo_range_key] = record[@range_key]
75-
end
72+
if @range_key
73+
next unless record[@range_key]
74+
key[@dynamo_range_key] = record[@range_key]
75+
end
7676

77-
@table.update_item({
78-
key: key,
79-
attribute_updates: {
80-
@dynamo_count_key => {
81-
value: record[@count_key],
82-
action: "ADD"
83-
},
77+
@table.update_item({
78+
key: key,
79+
attribute_updates: {
80+
@dynamo_count_key => {
81+
value: record[@count_key],
82+
action: "ADD"
8483
},
85-
})
86-
end
84+
},
85+
})
8786
end
87+
end
8888

89-
private
89+
private
9090

91-
def create_key(record)
92-
key_array = []
93-
key_array << @add_hash_key_prefix if @add_hash_key_prefix
94-
@hash_key.each do |h|
95-
return nil unless record[h]
96-
key_array << record[h]
97-
end
98-
key_array.join(@hash_key_delimiter)
91+
def create_key(record)
92+
key_array = []
93+
key_array << @add_hash_key_prefix if @add_hash_key_prefix
94+
@hash_key.each do |h|
95+
return nil unless record[h]
96+
key_array << record[h]
9997
end
98+
key_array.join(@hash_key_delimiter)
10099
end
101100
end

0 commit comments

Comments
 (0)