Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions paimon-python/pypaimon/api/api_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dataclasses import dataclass
from typing import Dict, Generic, List, Optional

from pypaimon.api.api_request import RESTRequest
from pypaimon.common.identifier import Identifier
from pypaimon.common.json_util import T, json_field
from pypaimon.common.options import Options
Expand Down Expand Up @@ -600,3 +601,14 @@ def to_dict(self) -> Dict:
result["functions"] = None
result["nextPageToken"] = self.next_page_token
return result


@dataclass
class AuthTableQueryRequest(RESTRequest):
select: Optional[List[str]] = json_field("select", default=None)


@dataclass
class AuthTableQueryResponse(RESTResponse):
filter: Optional[List[str]] = json_field("filter", default=None)
column_masking: Optional[Dict[str, str]] = json_field("columnMasking", default=None)
6 changes: 6 additions & 0 deletions paimon-python/pypaimon/api/resource_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ def rename_branch(self, database_name: str, table_name: str, branch_name: str) -
def forward_branch(self, database_name: str, table_name: str, branch_name: str) -> str:
return "{}/{}".format(
self.branch(database_name, table_name, branch_name), self.FORWARD)

def auth_table(self, database_name: str, table_name: str) -> str:
return "{}/{}/{}/{}/{}/auth".format(
self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
self.TABLES, RESTUtil.encode_string(table_name)
)
14 changes: 13 additions & 1 deletion paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
ListTablesResponse, ListTagsResponse,
PagedList,
PagedResponse, GetTableSnapshotResponse,
Partition)
Partition,
AuthTableQueryRequest,
AuthTableQueryResponse)
from pypaimon.api.auth import AuthProviderFactory, RESTAuthFunction
from pypaimon.api.client import HttpClient
from pypaimon.api.resource_paths import ResourcePaths
Expand Down Expand Up @@ -688,6 +690,16 @@ def alter_function(self, identifier: Identifier, changes: List) -> None:
self.rest_auth_function,
)

def auth_table_query(self, identifier: Identifier, select: Optional[List[str]]) -> AuthTableQueryResponse:
database_name, table_name = self.__validate_identifier(identifier)
request = AuthTableQueryRequest(select=select)
return self.client.post_with_response_type(
self.resource_paths.auth_table(database_name, table_name),
request,
AuthTableQueryResponse,
self.rest_auth_function,
)

@staticmethod
def __validate_identifier(identifier: Identifier):
if not identifier:
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,6 @@ def list_tags_paged(
raise NotImplementedError(
"list_tags_paged is not supported by this catalog."
)

def auth_table_query(self, identifier: Identifier, select: Optional[List[str]]) -> 'TableQueryAuthResult':
raise NotImplementedError("auth_table_query not supported by this catalog")
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/catalog/catalog_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,12 @@ def empty() -> 'CatalogEnvironment':
catalog_loader=None,
supports_version_management=False
)

def table_query_auth(self, options, identifier):
if not options.query_auth_enabled or self.catalog_loader is None:
return lambda select: None

def auth(select):
catalog = self.catalog_loader.load()
return catalog.auth_table_query(identifier, select)
return auth
19 changes: 18 additions & 1 deletion paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from pypaimon.api.rest_api import RESTApi
from pypaimon.catalog.catalog_exception import IllegalArgumentError
from pypaimon.api.rest_exception import (NoSuchResourceException, AlreadyExistsException,
ForbiddenException, BadRequestException)
ForbiddenException, BadRequestException,
ServiceFailureException, NotImplementedException)
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_environment import CatalogEnvironment
Expand Down Expand Up @@ -756,3 +757,19 @@ def create(file_io: FileIO,
) -> FileStoreTable:
"""Create FileStoreTable with dynamic options and catalog environment"""
return FileStoreTable(file_io, catalog_environment.identifier, table_path, table_schema, catalog_environment)

def auth_table_query(self, identifier, select=None):
from pypaimon.catalog.table_query_auth import TableQueryAuthResult, TableNoPermissionException
try:
response = self.rest_api.auth_table_query(identifier, select)
return TableQueryAuthResult(response.filter, response.column_masking)
except NoSuchResourceException as e:
raise TableNotExistException(identifier) from e
except ForbiddenException as e:
raise TableNoPermissionException(identifier, e) from e
except ServiceFailureException as e:
raise RuntimeError(e.args[0] if e.args else str(e)) from e
except NotImplementedException as e:
raise NotImplementedError(e.args[0] if e.args else str(e)) from e
except BadRequestException as e:
raise RuntimeError(str(e)) from e
87 changes: 87 additions & 0 deletions paimon-python/pypaimon/catalog/table_query_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Callable, Dict, List, Optional

import pyarrow as pa
import pyarrow.compute as pc

from pypaimon.common.predicate_json_parser import (
extract_referenced_fields,
parse_predicate_to_batch_filter,
)
from pypaimon.schema.data_types import DataField


class TableNoPermissionException(Exception):
MSG = "Table %s has no permission. Cause by %s."

def __init__(self, identifier, cause=None):
cause_msg = str(cause) if cause else ""
super().__init__(self.MSG % (identifier, cause_msg))
self.identifier = identifier
self.__cause__ = cause


class TableQueryAuthResult:

def __init__(self, filter: Optional[List[str]], column_masking: Optional[Dict[str, str]]):
self.filter = filter
self.column_masking = column_masking

def convert_plan(self, plan):
from pypaimon.read.query_auth_split import QueryAuthSplit
from pypaimon.read.plan import Plan

if not self.filter and not self.column_masking:
return plan
auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()]
return Plan(auth_splits)

def extract_row_filter(self) -> Optional[Callable[[pa.RecordBatch], pa.Array]]:
if not self.filter:
return None
filters = [parse_predicate_to_batch_filter(json_str) for json_str in self.filter]
if len(filters) == 1:
return filters[0]

def combined(batch: pa.RecordBatch) -> pa.Array:
result = filters[0](batch)
for f in filters[1:]:
result = pc.and_(result, f(batch))
return result
return combined

def get_extra_fields_for_filter(
self,
read_fields: List[DataField],
table_fields: List[DataField],
) -> List[DataField]:
if not self.filter:
return []
read_field_names = {f.name for f in read_fields}
extra = []
for json_str in self.filter:
referenced = extract_referenced_fields(json_str)
for name in referenced:
if name not in read_field_names:
field = next((f for f in table_fields if f.name == name), None)
if field:
extra.append(field)
read_field_names.add(name)
return extra
11 changes: 11 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,13 @@ class CoreOptions:
)
)

QUERY_AUTH_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("query-auth.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable query auth.")
)

PARTITION_DEFAULT_NAME: ConfigOption[str] = (
ConfigOptions.key("partition.default-name")
.string_type()
Expand Down Expand Up @@ -1080,3 +1087,7 @@ def add_column_before_partition(self) -> bool:

def dynamic_partition_overwrite(self) -> bool:
return self.options.get(CoreOptions.DYNAMIC_PARTITION_OVERWRITE)

@property
def query_auth_enabled(self) -> bool:
return self.options.get(CoreOptions.QUERY_AUTH_ENABLED)
Loading
Loading