Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ GIT
PATH
remote: .
specs:
umbrellio-utils (1.13.2)
umbrellio-utils (1.14.0)
memery (~> 1)

GEM
Expand Down
1 change: 1 addition & 0 deletions lib/umbrellio_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def synchronize(&)
require_relative "umbrellio_utils/formatting"
require_relative "umbrellio_utils/http_client"
require_relative "umbrellio_utils/jobs"
require_relative "umbrellio_utils/large_object" if defined?(Sequel)
require_relative "umbrellio_utils/migrations"
require_relative "umbrellio_utils/misc"
require_relative "umbrellio_utils/parsing"
Expand Down
32 changes: 24 additions & 8 deletions lib/umbrellio_utils/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module UmbrellioUtils
module Database
DEFAULT_ORDER = :desc

extend self

class HandledConstaintError < StandardError
Expand All @@ -28,13 +30,14 @@ def get_violated_constraint_name(exception)
def each_record(dataset, primary_key: nil, **options, &block)
primary_key = primary_key_from(dataset, primary_key:)
eager_tables = Array(options.delete(:eager_load))
order = options.fetch(:order, DEFAULT_ORDER)

with_temp_table(dataset, primary_key:, **options) do |ids|
rows = ids.map { |id| row(id.is_a?(Hash) ? id.values : [id]) }
records = dataset.model
.eager(eager_tables)
.where(row(primary_key) => rows)
.reverse(row(primary_key)).all
records = ordered(
dataset.model.eager(eager_tables).where(row(primary_key) => rows),
row(primary_key), order
).all
records.each(&block)
end
end
Expand All @@ -48,15 +51,20 @@ def each_record(dataset, primary_key: nil, **options, &block)
# @option [Array] primary_key custom primary key to use for dataset
# @option [Symbol, String] temp_table_name custom name for temporary table,
# table is reused if already exists
# @option [Symbol] order primary key order to yield batches in, :asc or :desc
# rubocop:disable Metrics/ParameterLists
def with_temp_table(
dataset,
page_size: 1_000,
sleep: nil,
primary_key: nil,
temp_table_name: nil,
transaction: true
transaction: true,
order: DEFAULT_ORDER
)
raise ArgumentError, "invalid order: #{order.inspect} (expected :asc or :desc)" \
unless %i[asc desc].include?(order)

primary_key = primary_key_from(dataset, primary_key:)
sleep_interval = sleep_interval_from(sleep)

Expand All @@ -68,7 +76,7 @@ def with_temp_table(

loop do
conditional_transaction(transaction) do
pk_set = pop_next_pk_batch(temp_table_name, primary_key, page_size)
pk_set = pop_next_pk_batch(temp_table_name, primary_key, page_size, order)
yield(pk_set) if pk_set.any?
end

Expand Down Expand Up @@ -122,6 +130,14 @@ def row(values)
Sequel.function(:row, *values)
end

def ordered(dataset, expr, order)
case order
when :asc then dataset.order(expr)
when :desc then dataset.reverse(expr)
else raise ArgumentError, "invalid order: #{order.inspect} (expected :asc or :desc)"
end
end

def extract_primary_key(dataset)
dataset.db.schema(dataset.first_source).select { |x| x[1][:primary_key] }.map(&:first)
end
Expand All @@ -147,9 +163,9 @@ def sleep_interval_from(sleep)
end
end

def pop_next_pk_batch(temp_table_name, primary_key, batch_size)
def pop_next_pk_batch(temp_table_name, primary_key, batch_size, order)
row = row(primary_key)
pk_expr = DB[temp_table_name].select(*primary_key).reverse(row).limit(batch_size)
pk_expr = ordered(DB[temp_table_name].select(*primary_key), row, order).limit(batch_size)
deleted_items = DB[temp_table_name].where(row => pk_expr).returning.delete
deleted_items.map do |item|
next item if primary_key.size > 1
Expand Down
61 changes: 61 additions & 0 deletions lib/umbrellio_utils/large_object.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

module UmbrellioUtils
class LargeObject
FILE_END = 2
PG_PAGE_SIZE = 8 * 1024

class AlreadyExists < StandardError
end

autoload :Writer, "umbrellio_utils/large_object/writer"

attr_reader :oid
attr_accessor :str_offset

def initialize(oid = -1)
self.oid = oid
self.str_offset = 0
end

def create!
run(:lo_create, oid)
rescue Sequel::UniqueConstraintViolation
raise AlreadyExists.new
end

def open_to_write!
DB.transaction do
fd = run(:lo_open, oid, 0x60000)
self.str_offset = run(:lo_lseek, fd, 0, FILE_END)
end
end

def append!(str)
run(:lo_put, oid, str_offset, Sequel.blob(str))
self.str_offset += str.bytesize
end

def read(*)
run(:lo_get, oid, *)
end

def delete!
run(:lo_unlink, oid)
rescue PG::UndefinedObject
# Ignored
end

def exists?
DB[:pg_largeobject_metadata].first(oid:).present?
end

private

attr_writer :oid

def run(method_name, *)
DB.get(Sequel.function(method_name, *))
end
end
end
72 changes: 72 additions & 0 deletions lib/umbrellio_utils/large_object/writer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# frozen_string_literal: true

module UmbrellioUtils
class LargeObject
# :nocov:
class Writer
MAX_BUFFER_SIZE = PG_PAGE_SIZE * 10

attr_reader :large_object

def initialize(large_object)
@large_object = large_object
self.sio = StringIO.new.binmode
super()

if block_given?
yield(self)
flush
end
end

def flush
sio.flush
large_object.append!(sio.string)
sio.truncate(0)
sio.rewind
self
end

def tell
sio.tell + large_object.str_offset
end
alias pos tell

def write(*)
sio.write(*)
flush if sio.string.size >= MAX_BUFFER_SIZE

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write returns the value of flush if …, which is nil when the buffer is below MAX_BUFFER_SIZE. Since << aliases write, IO-style chaining writer << a << b becomes nil << bNoMethodError. StringIO#<< returns self; returning self here would match that contract.

self
end
alias << write

def reopen(other, *)
sio.reopen(other.sio.string, *)
self
end

def pos=(new_pos)
case
when new_pos < large_object.str_offset
flush
relative_pos = new_pos - large_object.str_offset
large_object.str_offset += relative_pos
when new_pos > sio.string.length
flush
large_object.str_offset = new_pos
else
sio.pos = new_pos - large_object.str_offset
end
end

def rewind
large_object.str_offset = 0
sio.rewind
end

protected

attr_accessor :sio
end
# :nocov:
end
end
2 changes: 1 addition & 1 deletion lib/umbrellio_utils/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module UmbrellioUtils
VERSION = "1.13.2"
VERSION = "1.14.0"
end
17 changes: 17 additions & 0 deletions spec/umbrellio_utils/database_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@
end
end

context "with order: :asc" do
let(:options) { Hash[order: :asc] }

it "yields each record in ascending order" do
expect(result_emails).to eq(users_data.pluck(:email))
expect(sleep_calls).to eq([])
end
end

context "with invalid order option" do
let(:options) { Hash[order: :invalid] }

it "raises ArgumentError" do
expect { result_emails }.to raise_error(ArgumentError, /invalid order/)
end
end

context "with eager_load" do
let(:options) { Hash[eager_load: [:user]] }

Expand Down
Loading