Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/elixir/lib/fluss/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule Fluss.Native do
do: :erlang.nif_error(:nif_not_loaded)

# Schema / TableDescriptor
def table_descriptor_new(_schema, _bucket_count, _properties),
def table_descriptor_new(_schema, _options),
do: :erlang.nif_error(:nif_not_loaded)

# Table
Expand Down
63 changes: 59 additions & 4 deletions bindings/elixir/lib/fluss/table_descriptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,80 @@ defmodule Fluss.TableDescriptor do
@moduledoc """
Descriptor for creating a Fluss table.

Options: `:bucket_count`, `:properties` (list of `{key, value}` string tuples).
Bundles a `Fluss.Schema` with optional table settings; pass the result to
`Fluss.Admin.create_table/5`.

## Options

`new!/2` takes a keyword list:

* `:bucket_count` - number of buckets to distribute the table into
(non-negative integer). Omitted means the server decides.
* `:bucket_keys` - column names to hash for bucketing. For primary-key
tables these must be a subset of the primary key (excluding partition
keys), defaulting to the primary key columns when omitted. For log
tables they are unconstrained.
* `:partition_keys` - column names to partition the table by.
* `:properties` - map of recognized Fluss table properties that the engine
interprets, e.g. `%{"table.log.format" => "ARROW"}`.
* `:custom_properties` - map of arbitrary string metadata (e.g. ownership,
team). Stored verbatim and never interpreted by Fluss.
* `:comment` - table comment.

## Examples

Fluss.TableDescriptor.new!(schema)
Fluss.TableDescriptor.new!(schema, bucket_count: 3)

Fluss.TableDescriptor.new!(schema,
bucket_count: 4,
bucket_keys: ["id"],
partition_keys: ["dt"],
custom_properties: %{"owner" => "data-platform"},
comment: "events table"
)

"""

defmodule Options do
@moduledoc false
# Internal boundary struct decoded by the `table_descriptor_new` NIF.
# Users supply these as a keyword list to `Fluss.TableDescriptor.new!/2`;
# `new!/2` normalizes that into this struct before crossing into Rust.

@type t :: %__MODULE__{
bucket_count: non_neg_integer() | nil,
bucket_keys: list(),
partition_keys: list(),
properties: map(),
custom_properties: map(),
comment: String.t() | nil
}

defstruct bucket_count: nil,
bucket_keys: [],
partition_keys: [],
properties: %{},
custom_properties: %{},
comment: nil
end

alias Fluss.Native

@type t :: reference()

@doc """
Builds a table descriptor from a `Fluss.Schema` and a keyword list of options.

See the module documentation for the available options. Raises `Fluss.Error`
if the descriptor cannot be built (e.g. an invalid bucket-key/partition-key
combination), or `KeyError` if an unknown option key is given.
"""
@spec new!(Fluss.Schema.t(), keyword()) :: t()
def new!(%Fluss.Schema{} = schema, opts \\ []) do
bucket_count = Keyword.get(opts, :bucket_count)
properties = Keyword.get(opts, :properties, [])
opts_struct = struct!(Fluss.TableDescriptor.Options, opts)

case Native.table_descriptor_new(schema, bucket_count, properties) do
case Native.table_descriptor_new(schema, opts_struct) do
{:error, %Fluss.Error{} = err} -> raise err
ref -> ref
end
Expand Down
31 changes: 23 additions & 8 deletions bindings/elixir/native/fluss_nif/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::atoms::to_nif_err;
use fluss::error::Error;
use fluss::metadata::{self, DataTypes, Schema, TableDescriptor};
use rustler::{NifStruct, NifTaggedEnum, ResourceArc};
use std::collections::HashMap;

pub struct TableDescriptorResource {
pub inner: TableDescriptor,
Expand Down Expand Up @@ -129,11 +130,21 @@ impl NifSchema {
}
}

#[derive(NifStruct)]
#[module = "Fluss.TableDescriptor.Options"]
pub struct NifTableOptions {
pub bucket_count: Option<i32>,
pub bucket_keys: Vec<String>,
pub partition_keys: Vec<String>,
pub properties: HashMap<String, String>,
pub custom_properties: HashMap<String, String>,
pub comment: Option<String>,
}

#[rustler::nif]
fn table_descriptor_new(
schema: NifSchema,
bucket_count: Option<i32>,
properties: Vec<(String, String)>,
opts: NifTableOptions,
) -> Result<ResourceArc<TableDescriptorResource>, rustler::Error> {
let mut schema_builder = Schema::builder();
for (name, dt) in &schema.columns {
Expand All @@ -142,14 +153,18 @@ fn table_descriptor_new(
if !schema.primary_key.is_empty() {
schema_builder = schema_builder.primary_key(schema.primary_key);
}

let built_schema = schema_builder.build().map_err(to_nif_err)?;

let mut builder = TableDescriptor::builder().schema(built_schema);
if let Some(count) = bucket_count {
builder = builder.distributed_by(Some(count), vec![]);
}
for (key, value) in properties {
builder = builder.property(&key, &value);
let mut builder = TableDescriptor::builder()
.schema(built_schema)
.properties(opts.properties)
.custom_properties(opts.custom_properties)
.partitioned_by(opts.partition_keys)
.distributed_by(opts.bucket_count, opts.bucket_keys);

if let Some(comment) = opts.comment {
builder = builder.comment(comment);
}
let descriptor = builder.build().map_err(to_nif_err)?;
Ok(ResourceArc::new(TableDescriptorResource {
Expand Down
65 changes: 65 additions & 0 deletions bindings/elixir/test/integration/admin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,69 @@ defmodule Fluss.Integration.AdminTest do
Fluss.Admin.get_table_schema(admin, @database, table)
end
end

describe "create_table/5 with descriptor options" do
test "round-trips comment and custom_properties", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"

schema =
Fluss.Schema.new()
|> Fluss.Schema.column("id", :int)
|> Fluss.Schema.column("name", :string)

descriptor =
Fluss.TableDescriptor.new!(schema,
comment: "events table",
custom_properties: %{"owner" => "data-platform"}
)

:ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true)
on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end)

assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table)
assert info.comment == "events table"
assert info.custom_properties == %{"owner" => "data-platform"}
end

test "round-trips explicit bucket_keys on a primary-key table", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"

schema =
Fluss.Schema.new()
|> Fluss.Schema.column("id", :int)
|> Fluss.Schema.column("region", :string)
|> Fluss.Schema.primary_key(["id", "region"])

descriptor = Fluss.TableDescriptor.new!(schema, bucket_count: 3, bucket_keys: ["id"])

:ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true)
on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end)

assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table)
assert info.num_buckets == 3
assert info.bucket_keys == ["id"]
# An explicit, non-default bucket key (the default would be the full PK).
assert info.is_default_bucket_key == false
end

test "round-trips partition_keys and reports the table as partitioned", %{admin: admin} do
table = "fluss_table_#{:rand.uniform(100_000)}"

# Partition keys must be a subset of the primary key for PK tables.
schema =
Fluss.Schema.new()
|> Fluss.Schema.column("id", :int)
|> Fluss.Schema.column("dt", :string)
|> Fluss.Schema.primary_key(["id", "dt"])

descriptor = Fluss.TableDescriptor.new!(schema, partition_keys: ["dt"], bucket_count: 2)

:ok = Fluss.Admin.create_table(admin, @database, table, descriptor, true)
on_exit(fn -> Fluss.Admin.drop_table(admin, @database, table, true) end)

assert {:ok, info} = Fluss.Admin.get_table_info(admin, @database, table)
assert info.is_partitioned == true
assert info.partition_keys == ["dt"]
end
end
end
86 changes: 86 additions & 0 deletions bindings/elixir/test/table_descriptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

defmodule Fluss.TableDescriptorTest do
use ExUnit.Case, async: true

alias Fluss.Schema
alias Fluss.TableDescriptor

describe "new!/2" do
test "returns a descriptor reference for a minimal schema" do
schema =
Schema.new()
|> Schema.column("id", :int)
|> Schema.column("name", :string)

assert is_reference(TableDescriptor.new!(schema))
end

test "returns a descriptor reference with all options set" do
schema =
Schema.new()
|> Schema.column("id", :int)
|> Schema.column("dt", :string)
|> Schema.primary_key(["id", "dt"])

descriptor =
TableDescriptor.new!(schema,
bucket_count: 3,
bucket_keys: ["id"],
partition_keys: ["dt"],
properties: %{"table.replication.factor" => "1"},
custom_properties: %{"owner" => "data-platform"},
comment: "events table"
)

assert is_reference(descriptor)
end

test "raises Fluss.Error when bucket keys are not a subset of the primary key" do
schema =
Schema.new()
|> Schema.column("id", :int)
|> Schema.column("region", :string)
|> Schema.primary_key(["id"])

assert_raise Fluss.Error, fn ->
TableDescriptor.new!(schema, bucket_keys: ["region"])
end
end

test "raises Fluss.Error when bucket keys overlap partition keys" do
schema =
Schema.new()
|> Schema.column("id", :int)
|> Schema.column("dt", :string)

assert_raise Fluss.Error, fn ->
TableDescriptor.new!(schema, partition_keys: ["dt"], bucket_keys: ["dt"])
end
end

# `new!/2` rejects unknown keys rather than silently dropping them.
test "raises KeyError on an unknown option" do
schema = Schema.new() |> Schema.column("id", :int)

assert_raise KeyError, fn ->
TableDescriptor.new!(schema, bogus: 1)
end
end
end
end