Skip to content

feat: Add range partitioning support #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. |
| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. |
| range-partitioning.range.interval| int | required | nil | The width of each interval. |
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
Expand Down Expand Up @@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.

### Range Partitioning

See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).

To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:

```yaml
out:
type: bigquery
table: table_name$1
range_partitioning:
field: customer_id
range:
start: 1
end: 99999
interval: 1
```

## Development

### Run example:
Expand Down
36 changes: 36 additions & 0 deletions example/config_replace_field_range_partitioned_table.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_field_partitioned_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
range_partitioning:
field: 'long'
range:
start: 90
end: 100
interval: 1
44 changes: 43 additions & 1 deletion lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),

Expand Down Expand Up @@ -227,14 +228,55 @@ def self.configure(config, schema, task_count)
task['abort_on_error'] = (task['max_bad_records'] == 0)
end

if task['time_partitioning'] && task['range_partitioning']
raise ConfigError.new "`time_partitioning` and `range_partitioning` cannot be used at the same time"
end

if task['time_partitioning']
unless task['time_partitioning']['type']
raise ConfigError.new "`time_partitioning` must have `type` key"
end
elsif Helper.has_partition_decorator?(task['table'])
end

if Helper.has_partition_decorator?(task['table'])
if task['range_partitioning']
raise ConfigError.new "Partition decorators(`#{task['table']}`) don't support `range_partition`"
end
task['time_partitioning'] = {'type' => 'DAY'}
end

if task['range_partitioning']
unless task['range_partitioning']['field']
raise ConfigError.new "`range_partitioning` must have `field` key"
end
unless task['range_partitioning']['range']
raise ConfigError.new "`range_partitioning` must have `range` key"
end

range = task['range_partitioning']['range']
unless range['start']
raise ConfigError.new "`range_partitioning` must have `range.start` key"
end
unless range['start'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.start` must be an integer"
end
unless range['end']
raise ConfigError.new "`range_partitioning` must have `range.end` key"
end
unless range['end'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.end` must be an integer"
end
unless range['interval']
raise ConfigError.new "`range_partitioning` must have `range.interval` key"
end
unless range['interval'].is_a?(Integer)
raise ConfigError.new "`range_partitioning.range.interval` must be an integer"
end
if range['start'] + range['interval'] >= range['end']
raise ConfigError.new "`range_partitioning.range.start` + `range_partitioning.range.interval` must be less than `range_partitioning.range.end`"
end
end

if task['clustering']
unless task['clustering']['fields']
raise ConfigError.new "`clustering` must have `fields` key"
Expand Down
12 changes: 12 additions & 0 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
}
end

options['range_partitioning'] ||= @task['range_partitioning']
if options['range_partitioning']
body[:range_partitioning] = {
field: options['range_partitioning']['field'],
range: {
start: options['range_partitioning']['range']['start'].to_s,
end: options['range_partitioning']['range']['end'].to_s,
interval: options['range_partitioning']['range']['interval'].to_s,
},
}
end

options['clustering'] ||= @task['clustering']
if options['clustering']
body[:clustering] = {
Expand Down
38 changes: 38 additions & 0 deletions test/test_configure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,44 @@ def test_time_partitioning
assert_equal 'DAY', task['time_partitioning']['type']
end

def test_range_partitioning
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 3, 'interval' => 1 }})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

# field is required
config = least_config.merge('range_partitioning' => {'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }


# range is required
config = least_config.merge('range_partitioning' => {'field' => 'foo'})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.start is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.end is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.interval is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.start + range.interval should be less than range.end
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 2 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }
end

def test_time_and_range_partitioning_error
config = least_config.merge('time_partitioning' => {'type' => 'DAY'}, 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

config = least_config.merge('table' => 'table_name$20160912', 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
assert_raise { Bigquery.configure(config, schema, processor_count) }
end

def test_clustering
config = least_config.merge('clustering' => {'fields' => ['field_a']})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
Expand Down