Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions backend/app/infrastructure/downloaders/connectors/gss_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import requests
import logging
from pathlib import Path
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

from .token_manager import TokenManager
from ....settings import settings


class GSSConnector:
"""
GSS Connector: interacts with the GSS APIs.
"""

def __init__(self, feature_id: str, workdir: str):
self._feature_id: str = feature_id
self._workdir: Path = Path(workdir)
self._logger = logging.getLogger(__name__)
self._feature: dict | None = None
self._cached_files: list[str] | None = None

self._token_manager = TokenManager(
token_url=settings.GSS_TOKEN_URL,
username=settings.GSS_USERNAME,
password=settings.GSS_PASSWORD,
client_id=settings.GSS_CLIENT_ID)

self._logger.debug(f"Initialized GSS connector for feature {self._feature_id}")

self._collections_map = { # this is temporary fix, until GSS STAC learns itself to search by ID
"S1": "SENTINEL-1",
"S2": "SENTINEL-2",
"S3": "SENTINEL-3",
"S5P": "SENTINEL-5P"
}

retry_strategy = Retry(
total=3,
status_forcelist=[429],
backoff_factor=2
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._http = requests.Session()
self._http.mount("https://", adapter)

# -----------------------
# Feature API
# -----------------------
def get_feature(self) -> dict:
if self._feature is None:
url = f"{settings.GSS_ODATA_CATALOG_ROOT.rstrip('/')}/Products({self._feature_id})"
self._logger.debug(f"Querying GSS Odata for product {self._feature_id} from API: {url}")

response = self._http.get(url,
headers={"Authorization": f"Bearer {self._token_manager.get_token()}"}
)

if response.status_code == 400 or response.status_code == 404: # Feature not found in GSS
self._logger.debug(f"Feature {self._feature_id} not found in GSS")
return None
elif response.status_code != 200:
self._logger.error(f"GSS API returned {response.status_code}: {response.text}")
return None

self._feature = response.json()
self._logger.debug(f"Feature metadata fetched successfully for {self._feature_id}")

self._logger.debug(f"Feature metadata fetched successfully for {self._feature}")
return self._feature

def get_available_files(self) -> list[str]:
if not self._feature:
self.get_feature()

product_name = self._feature["Name"]
collection_name = next((self._collections_map[collection] for collection in self._collections_map.keys() if product_name.startswith(collection)), None)

if collection_name is None:
self._logger.error(f"Could not determine collection for feature {product_name}")
raise ValueError(f"Could not determine collection for feature {product_name}")

url = f"{settings.GSS_STAC_CATALOG_ROOT.rstrip('/')}/collections/{collection_name}/items/{product_name}"
self._logger.debug(f"Fetching STAC metadata from API: {url}")
response = self._http.get(url,
headers={"Authorization": f"Bearer {self._token_manager.get_token()}"}
)
if response.status_code != 200:
self._logger.error(f"GSS STAC API returned {response.status_code}: {response.text}")
raise RuntimeError(f"GSS STAC API error: {response.status_code}, response: {response.text}")

self._logger.debug(f"Fetched STAC metadata including {len(response.json()['assets'])} assets.")
return [asset["href"] for asset in response.json()["assets"].values()]

def download_selected_files(self, files_to_download: list[str]) -> list[str]:
self._workdir.mkdir(parents=True, exist_ok=True)

downloaded: list[str] = []
self._logger.debug(f"Downloading {len(files_to_download)} files to {self._workdir}")

for https_url in files_to_download:
out_path = self._workdir / Path(https_url).name

try:
self._logger.debug(f"Downloading from {https_url}")

response = self._http.get(
https_url,
headers={"Authorization": f"Bearer {self._token_manager.get_token()}"}
)

if response.status_code == 200:
with open(out_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)

downloaded.append(str(out_path))
self._logger.info(f"Downloaded {https_url} to {out_path}")
else:
self._logger.error(response.headers)
self._logger.error(f"Failed to download {https_url}: HTTP {response.status_code}")

except Exception as e:
self._logger.error(f"Failed to download {https_url}: {e}")

return downloaded
105 changes: 105 additions & 0 deletions backend/app/infrastructure/downloaders/connectors/token_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import threading
from datetime import datetime, timedelta
from typing import Optional
import httpx

import logging


class TokenManager:
"""
Thread-safe token manager for OAuth2 authentication.
Handles token acquisition and automatic refresh when expired or nearly expired.
"""

def __init__(
self,
token_url: str,
username: str,
password: str,
client_id: str,
client_secret: Optional[str] = None,
refresh_threshold_seconds: int = 300
):
"""
Initialize the GSS token manager.

Args:
token_url: OAuth2 token endpoint URL
username: Username for authentication
password: Password for authentication
client_id: OAuth2 client ID
client_secret: OAuth2 client secret (optional)
refresh_threshold_seconds: Refresh token this many seconds before expiry (default: 300)
"""
self._token_url = token_url
self._username = username
self._password = password
self._client_id = client_id
self._client_secret = client_secret
self._refresh_threshold = refresh_threshold_seconds

self._access_token: Optional[str] = None
self._token_expiry: Optional[datetime] = None
self._lock = threading.Lock()
self._logger = logging.getLogger(__name__)

def _request_token(self) -> dict:
"""
Request a new access token from the token endpoint.

Returns:
dict: Token response containing access_token, expires_in, etc.

Raises:
httpx.HTTPError: If token request fails
"""
data = {
"grant_type": "password",
"username": self._username,
"password": self._password,
"client_id": self._client_id,
}

if self._client_secret:
data["client_secret"] = self._client_secret

self._logger.debug(f"Requesting token from {self._token_url}")
with httpx.Client() as client:
response = client.post(self._token_url, data=data)
response.raise_for_status()
return response.json()

def _is_token_expired(self) -> bool:
"""
Check if the current token is expired or nearly expired.

Returns:
bool: True if token is None, expired, or nearly expired
"""
if self._access_token is None or self._token_expiry is None:
self._logger.debug(f"Token for client {self._client_id} is expired or never acquired.")
return True

threshold_time = datetime.now() + timedelta(seconds=self._refresh_threshold)
return threshold_time >= self._token_expiry

def get_token(self) -> str:
"""
Get a valid access token, refreshing if necessary.
This method is thread-safe.

Returns:
str: Valid access token

Raises:
httpx.HTTPError: If token request fails
"""
with self._lock:
if self._is_token_expired():
token_response = self._request_token()
self._access_token = token_response["access_token"]
expires_in = token_response.get("expires_in", 3600)
self._token_expiry = datetime.now() + timedelta(seconds=expires_in)

return self._access_token
34 changes: 25 additions & 9 deletions backend/app/infrastructure/downloaders/providers/gss_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from . import BaseProvider
from ..connectors.gss_connector import GSSConnector
from ....domain import Job
from ....settings import settings

Expand All @@ -9,25 +10,40 @@ class GSSProvider(BaseProvider):
def __init__(
self,
job: Job,
logger: logging.Logger | None = None
):
super().__init__(
job=job,
logger=logger
logger=logging.getLogger(__name__)
)
self._connector = GSSConnector(feature_id=self._job.product_id,
workdir=self._path_to_downloaded)

def has_product(self) -> bool:
if not settings.SENTINEL_ENABLE_GSS:
if not settings.ENABLE_GSS:
self._logger.warning("GSS datasource is not enabled!")
return False

# TODO implementovat lookup produktu na GSS
return False
if any(not value for value in settings.GSS_CREDENTIALS.values()):
self._logger.error("GSS credentials are not configured!")
return False

return self._connector.get_feature() is not None

def list_product_files(self) -> list[str]:
# TODO lookup produktů na GSS
raise NotImplementedError("GSS datasource lookup is not implemented!")
available_files: list[str] = self._connector.get_available_files()

if not available_files:
self._logger.warning(f"No files found for product {self._job.product_id}")
else:
self._logger.info(f"Found {len(available_files)} files for product {self._job.product_id}")

return available_files

def download_product_files(self, files_to_download: list[str]) -> list[str]:
# TODO download prodkutů z GSS
raise NotImplementedError("GSS datasource download is not implemented!")
self._logger.info(f"Starting GSS download for product {self._job.product_id}")

downloaded_files: list[str] = self._connector.download_selected_files(files_to_download=files_to_download)

self._logger.info(f"Finished downloading {len(downloaded_files)} files for product {self._job.product_id}")

return downloaded_files
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ def _filter_files(self, available_files: list[str] = None) -> list[str]:
)
)

self._logger.info(f"Available files: {available_files}")
for file in available_files:
file_strip_lower = file.strip().lower()

# only measurement TIFFs
if "/measurement/" not in file_strip_lower or not re.search(r"\.(tif|tiff)$", file_strip_lower):
# only TIFFs
if not re.search(r"\.(tif|tiff)$", file_strip_lower):
continue

# loose filter
Expand Down
22 changes: 21 additions & 1 deletion backend/app/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ def ENABLED_DATASETS(self) -> List[str]:
return datasets

# Sentinel credentials
SENTINEL_ENABLE_GSS: bool = False
SENTINEL_CDSE_CATALOG_ROOT: str = "https://catalogue.dataspace.copernicus.eu/odata/v1/"
SENTINEL_CDSE_S3_ENDPOINT_URL: str = "https://eodata.dataspace.copernicus.eu/"
SENTINEL_CDSE_S3_REGION_NAME: str = "default"
SENTINEL_CDSE_S3_ACCESS_KEY: Optional[str] = None
SENTINEL_CDSE_S3_SECRET_KEY: Optional[str] = None

ENABLE_GSS: bool = False
GSS_ODATA_CATALOG_ROOT: Optional[str] = None
GSS_STAC_CATALOG_ROOT: Optional[str] = None
GSS_CLIENT_ID: Optional[str] = None
GSS_CLIENT_SECRET: Optional[str] = None
GSS_TOKEN_URL: Optional[str] = None
GSS_USERNAME: Optional[str] = None
GSS_PASSWORD: Optional[str] = None

@computed_field
@property
def SENTINEL_CDSE_S3_CREDENTIALS(self) -> dict[str, Optional[str]]:
Expand All @@ -62,6 +70,18 @@ def SENTINEL_CDSE_S3_CREDENTIALS(self) -> dict[str, Optional[str]]:
"endpoint_url": self.SENTINEL_CDSE_S3_ENDPOINT_URL
}

@computed_field
@property
def GSS_CREDENTIALS(self) -> dict[str, Optional[str]]:
return {
"odata_catalog_root": self.GSS_ODATA_CATALOG_ROOT,
"stac_catalog_root": self.GSS_STAC_CATALOG_ROOT,
"client_id": self.GSS_CLIENT_ID,
"token_url": self.GSS_TOKEN_URL,
"username": self.GSS_USERNAME,
"password": self.GSS_PASSWORD
}

# ------------------------------------------------------------------
# PATHS
# ------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pydantic-settings~=2.13.1
httpx~=0.28.1
boto3~=1.42.63
botocore~=1.42.63
requests~=2.32.4