Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docker/Dockerfile → Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ RUN useradd -ms /bin/bash pysus \

COPY pyproject.toml poetry.lock LICENSE README.md /usr/src/
COPY pysus /usr/src/pysus
COPY docker/scripts/entrypoint.sh /entrypoint.sh
COPY docker/notebooks/ /home/pysus/Notebooks/
COPY entrypoint.sh /entrypoint.sh

RUN pip install poetry \
&& cd /usr/src && poetry config virtualenvs.create false && poetry install --with docs \
&& pip install 'httpx<0.28' \
&& chown -R pysus:pysus /home/pysus

USER pysus
Expand Down
4 changes: 2 additions & 2 deletions docker/docker-compose.yaml → docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
services:
jupyter:
build:
context: ".."
dockerfile: docker/Dockerfile
context: "."
dockerfile: Dockerfile
hostname: pysus-jupyter
container_name: pysus-jupyter
ports:
Expand Down
89 changes: 0 additions & 89 deletions docker/notebooks/Welcome.ipynb

This file was deleted.

6 changes: 0 additions & 6 deletions docker/scripts/poetry-install.sh

This file was deleted.

File renamed without changes.
16 changes: 5 additions & 11 deletions pysus/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ async def download(

if timeout is not None:
with anyio.fail_after(timeout):
await client._download_file(file, local_path, callback)
await client.download(file, local_path, callback)
else:
await client._download_file(file, local_path, callback)
await client.download(file, local_path, callback)

await self._update_state(
local_path=local_path,
Expand Down Expand Up @@ -517,9 +517,7 @@ async def query(
all_datasets = await self._ducklake.datasets()

if dataset:
matching = [
d for d in all_datasets if d.name.lower() == dataset.lower()
]
matching = [d for d in all_datasets if d.name.lower() == dataset.lower()]
if not matching:
return []
target = matching[0]
Expand Down Expand Up @@ -618,9 +616,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:

else:
paths_str = ", ".join(f"'{p}'" for p in paths)
query = (
f"SELECT * FROM read_parquet([{paths_str}], union_by_name=True)"
)
query = f"SELECT * FROM read_parquet([{paths_str}], union_by_name=True)"

if sql:
if sql.upper().startswith("SELECT"):
Expand All @@ -633,9 +629,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
if not add_dv:
return base

geocode_cols = [
col[0] for col in base.description if is_geocode_column(col[0])
]
geocode_cols = [col[0] for col in base.description if is_geocode_column(col[0])]
if not geocode_cols:
return base

Expand Down
18 changes: 5 additions & 13 deletions pysus/api/dadosgov/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,9 @@ def _dedup_entries(
if m:
stem = filename[: m.start()]
fmt = m.group(1).lower()
grouped.setdefault(stem, []).append(
(fmt, filename, recurso, metadata)
)
grouped.setdefault(stem, []).append((fmt, filename, recurso, metadata))
else:
grouped.setdefault(filename, []).append(
("", filename, recurso, metadata)
)
grouped.setdefault(filename, []).append(("", filename, recurso, metadata))

result: list[tuple[str, Any, dict]] = []
for _, items in grouped.items():
Expand Down Expand Up @@ -210,7 +206,7 @@ async def _download(
"""Download the file to a local path."""
if not output:
output = CACHEPATH / self.name
return await self.client._download_file(self, output, callback=callback)
return await self.client.download(self, output, callback=callback)

async def fetch_size(self) -> int:
"""Fetch the remote file size and update the local record.
Expand Down Expand Up @@ -249,9 +245,7 @@ class Group(BaseRemoteGroup):
"""A group of files within a dataset."""

record: ConjuntoDados
_formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr(
default=None
)
_formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr(default=None)

def __init__(
self,
Expand Down Expand Up @@ -319,9 +313,7 @@ async def _fetch_files(self) -> list[BaseRemoteFile]:
"""Build File objects from the underlying resources."""
entries: list[tuple[str, Any, dict]] = []
for recurso in self.record.resources:
filename = (
recurso.file_name or recurso.url.split("/")[-1].split("?")[0]
)
filename = recurso.file_name or recurso.url.split("/")[-1].split("?")[0]
if filename.lower().endswith(".pdf") or filename.startswith("get_"):
continue
metadata = {}
Expand Down
176 changes: 176 additions & 0 deletions pysus/api/ducklake/catalog/adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
from abc import ABC
from pathlib import Path

import httpx
from anyio import to_thread
from pydantic import BaseModel, SecretStr
from sqlalchemy.engine import Engine
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from sqlalchemy.orm import sessionmaker, Session

from pysus import CACHEPATH
from pysus.api import types
from pysus.api.ducklake.functional import download_s3, upload_s3


class DuckLakeCredentials(BaseModel):
access_key: SecretStr
secret_key: SecretStr


class BaseAdapter(ABC):
cache_dir: Path = Path(CACHEPATH) / "ducklake"
db_local: Path
db_remote: Path

def __init__(
self, engine=None, credentials: DuckLakeCredentials | None = None, **data
) -> None:
self._engine = engine
self._session_factory = None
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.credentials = credentials

@property
def remote_url(self) -> str:
return f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{self.db_remote}"

def get_session(self) -> Session:
if not self._session_factory:
raise RuntimeError("Database engine not initialized. Call connect() first.")
return self._session_factory()

async def connect(self, force: bool = False) -> None:
if self._engine and not force:
if not self._session_factory:
self._session_factory = sessionmaker(bind=self._engine)
return

await self._download_catalog(
self.db_local,
str(self.db_remote),
)
self._engine = await to_thread.run_sync(self.setup_engine)
self._session_factory = sessionmaker(bind=self._engine)

def setup_engine(
self, access_key: str | None = None, secret_key: str | None = None
) -> Engine:
engine: Engine = create_engine(
f"duckdb:///{self.db_local}",
poolclass=StaticPool,
)

with engine.connect() as conn:
conn.exec_driver_sql("INSTALL ducklake; LOAD ducklake;")

has_pysus = conn.exec_driver_sql(
"SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pysus'"
).fetchone()

if has_pysus:
conn.exec_driver_sql("SET search_path='pysus,main';")
else:
conn.exec_driver_sql("SET search_path='main';")

s3_cfg = {
"s3_endpoint": types.S3_ENDPOINT,
"s3_region": types.S3_REGION,
"s3_url_style": "path",
"s3_use_ssl": "true",
}

if access_key and secret_key:
s3_cfg["s3_access_key_id"] = access_key
s3_cfg["s3_secret_access_key"] = secret_key

for key, value in s3_cfg.items():
conn.exec_driver_sql(f"SET {key}='{value}'")

conn.commit()

return engine

async def _download_catalog(self, local_path: Path, remote_path: str) -> None:
url = f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{remote_path}"

if local_path.exists():
try:
local_size = local_path.stat().st_size
except OSError:
local_size = -1
else:
local_size = -1

async with httpx.AsyncClient(follow_redirects=True) as client:
try:
head = await client.head(url)
head.raise_for_status()
remote_size = int(head.headers.get("content-length", 0))
except Exception:
remote_size = 0

if remote_size == local_size:
return

access_key = (
self.credentials.access_key.get_secret_value() if self.credentials else None
)
secret_key = (
self.credentials.secret_key.get_secret_value() if self.credentials else None
)

await download_s3(
remote_path=remote_path,
local_path=local_path,
access_key=access_key,
secret_key=secret_key,
)

async def _upload_catalog(self) -> None:
if not self.credentials:
raise PermissionError(
"Admin credentials required to upload catalog.",
)

if not self.db_local.exists():
raise FileNotFoundError("catalog file not found")

await upload_s3(
local_path=self.db_local,
remote_path=str(self.db_remote),
access_key=self.credentials.access_key.get_secret_value(),
secret_key=self.credentials.secret_key.get_secret_value(),
)

async def close(self, update: bool = False) -> None:
if update:
await self._upload_catalog()

if self._engine:
await to_thread.run_sync(self._engine.dispose)
self._engine = None
self._session_factory = None


class CatalogAdapter(BaseAdapter):
def __init__(self, engine=None, **data) -> None:
super().__init__(engine=engine, **data)
self.db_local: Path = self.cache_dir / "catalog.duckdb"
self.db_remote: str = "public/catalog.duckdb"


class DatasetAdapter(BaseAdapter):
def __init__(self, name: str, engine=None, **data) -> None:
super().__init__(engine=engine, **data)
self.dataset_name: str = name
self.db_local: Path = self.cache_dir / f"catalog_{name}.duckdb"
self.db_remote: str = f"datasets/catalog_{name}.duckdb"


class ColumnsAdapter(BaseAdapter):
def __init__(self, engine=None, **data) -> None:
super().__init__(engine=engine, **data)
self.db_local: Path = self.cache_dir / "columns.duckdb"
self.db_remote: str = "public/columns.duckdb"
Loading
Loading