diff --git a/backend/app/infrastructure/downloaders/connectors/gss_connector.py b/backend/app/infrastructure/downloaders/connectors/gss_connector.py new file mode 100644 index 0000000..0ebbdaa --- /dev/null +++ b/backend/app/infrastructure/downloaders/connectors/gss_connector.py @@ -0,0 +1,127 @@ +import requests +import logging +from pathlib import Path +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +from .token_manager import TokenManager +from ....settings import settings + + +class GSSConnector: + """ + GSS Connector: interacts with the GSS APIs. + """ + + def __init__(self, feature_id: str, workdir: str): + self._feature_id: str = feature_id + self._workdir: Path = Path(workdir) + self._logger = logging.getLogger(__name__) + self._feature: dict | None = None + self._cached_files: list[str] | None = None + + self._token_manager = TokenManager( + token_url=settings.GSS_TOKEN_URL, + username=settings.GSS_USERNAME, + password=settings.GSS_PASSWORD, + client_id=settings.GSS_CLIENT_ID) + + self._logger.debug(f"Initialized GSS connector for feature {self._feature_id}") + + self._collections_map = { # this is temporary fix, until GSS STAC learns itself to search by ID + "S1": "SENTINEL-1", + "S2": "SENTINEL-2", + "S3": "SENTINEL-3", + "S5P": "SENTINEL-5P" + } + + retry_strategy = Retry( + total=3, + status_forcelist=[429], + backoff_factor=2 + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self._http = requests.Session() + self._http.mount("https://", adapter) + + # ----------------------- + # Feature API + # ----------------------- + def get_feature(self) -> dict: + if self._feature is None: + url = f"{settings.GSS_ODATA_CATALOG_ROOT.rstrip('/')}/Products({self._feature_id})" + self._logger.debug(f"Querying GSS Odata for product {self._feature_id} from API: {url}") + + response = self._http.get(url, + headers={"Authorization": f"Bearer {self._token_manager.get_token()}"} + ) + + if response.status_code == 400 or response.status_code == 404: # Feature not found in GSS + self._logger.debug(f"Feature {self._feature_id} not found in GSS") + return None + elif response.status_code != 200: + self._logger.error(f"GSS API returned {response.status_code}: {response.text}") + return None + + self._feature = response.json() + self._logger.debug(f"Feature metadata fetched successfully for {self._feature_id}") + + self._logger.debug(f"Feature metadata fetched successfully for {self._feature}") + return self._feature + + def get_available_files(self) -> list[str]: + if not self._feature: + self.get_feature() + + product_name = self._feature["Name"] + collection_name = next((self._collections_map[collection] for collection in self._collections_map.keys() if product_name.startswith(collection)), None) + + if collection_name is None: + self._logger.error(f"Could not determine collection for feature {product_name}") + raise ValueError(f"Could not determine collection for feature {product_name}") + + url = f"{settings.GSS_STAC_CATALOG_ROOT.rstrip('/')}/collections/{collection_name}/items/{product_name}" + self._logger.debug(f"Fetching STAC metadata from API: {url}") + response = self._http.get(url, + headers={"Authorization": f"Bearer {self._token_manager.get_token()}"} + ) + if response.status_code != 200: + self._logger.error(f"GSS STAC API returned {response.status_code}: {response.text}") + raise RuntimeError(f"GSS STAC API error: {response.status_code}, response: {response.text}") + + self._logger.debug(f"Fetched STAC metadata including {len(response.json()['assets'])} assets.") + return [asset["href"] for asset in response.json()["assets"].values()] + + def download_selected_files(self, files_to_download: list[str]) -> list[str]: + self._workdir.mkdir(parents=True, exist_ok=True) + + downloaded: list[str] = [] + self._logger.debug(f"Downloading {len(files_to_download)} files to {self._workdir}") + + for https_url in files_to_download: + out_path = self._workdir / Path(https_url).name + + try: + self._logger.debug(f"Downloading from {https_url}") + + response = self._http.get( + https_url, + headers={"Authorization": f"Bearer {self._token_manager.get_token()}"} + ) + + if response.status_code == 200: + with open(out_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + downloaded.append(str(out_path)) + self._logger.info(f"Downloaded {https_url} to {out_path}") + else: + self._logger.error(response.headers) + self._logger.error(f"Failed to download {https_url}: HTTP {response.status_code}") + + except Exception as e: + self._logger.error(f"Failed to download {https_url}: {e}") + + return downloaded diff --git a/backend/app/infrastructure/downloaders/connectors/token_manager.py b/backend/app/infrastructure/downloaders/connectors/token_manager.py new file mode 100644 index 0000000..0d766f5 --- /dev/null +++ b/backend/app/infrastructure/downloaders/connectors/token_manager.py @@ -0,0 +1,105 @@ +import threading +from datetime import datetime, timedelta +from typing import Optional +import httpx + +import logging + + +class TokenManager: + """ + Thread-safe token manager for OAuth2 authentication. + Handles token acquisition and automatic refresh when expired or nearly expired. + """ + + def __init__( + self, + token_url: str, + username: str, + password: str, + client_id: str, + client_secret: Optional[str] = None, + refresh_threshold_seconds: int = 300 + ): + """ + Initialize the GSS token manager. + + Args: + token_url: OAuth2 token endpoint URL + username: Username for authentication + password: Password for authentication + client_id: OAuth2 client ID + client_secret: OAuth2 client secret (optional) + refresh_threshold_seconds: Refresh token this many seconds before expiry (default: 300) + """ + self._token_url = token_url + self._username = username + self._password = password + self._client_id = client_id + self._client_secret = client_secret + self._refresh_threshold = refresh_threshold_seconds + + self._access_token: Optional[str] = None + self._token_expiry: Optional[datetime] = None + self._lock = threading.Lock() + self._logger = logging.getLogger(__name__) + + def _request_token(self) -> dict: + """ + Request a new access token from the token endpoint. + + Returns: + dict: Token response containing access_token, expires_in, etc. + + Raises: + httpx.HTTPError: If token request fails + """ + data = { + "grant_type": "password", + "username": self._username, + "password": self._password, + "client_id": self._client_id, + } + + if self._client_secret: + data["client_secret"] = self._client_secret + + self._logger.debug(f"Requesting token from {self._token_url}") + with httpx.Client() as client: + response = client.post(self._token_url, data=data) + response.raise_for_status() + return response.json() + + def _is_token_expired(self) -> bool: + """ + Check if the current token is expired or nearly expired. + + Returns: + bool: True if token is None, expired, or nearly expired + """ + if self._access_token is None or self._token_expiry is None: + self._logger.debug(f"Token for client {self._client_id} is expired or never acquired.") + return True + + threshold_time = datetime.now() + timedelta(seconds=self._refresh_threshold) + return threshold_time >= self._token_expiry + + def get_token(self) -> str: + """ + Get a valid access token, refreshing if necessary. + This method is thread-safe. + + Returns: + str: Valid access token + + Raises: + httpx.HTTPError: If token request fails + """ + with self._lock: + if self._is_token_expired(): + token_response = self._request_token() + self._access_token = token_response["access_token"] + expires_in = token_response.get("expires_in", 3600) + self._token_expiry = datetime.now() + timedelta(seconds=expires_in) + + return self._access_token diff --git a/backend/app/infrastructure/downloaders/providers/gss_provider.py b/backend/app/infrastructure/downloaders/providers/gss_provider.py index c8dae57..d1c8ffa 100644 --- a/backend/app/infrastructure/downloaders/providers/gss_provider.py +++ b/backend/app/infrastructure/downloaders/providers/gss_provider.py @@ -1,6 +1,7 @@ import logging from . import BaseProvider +from ..connectors.gss_connector import GSSConnector from ....domain import Job from ....settings import settings @@ -9,25 +10,40 @@ class GSSProvider(BaseProvider): def __init__( self, job: Job, - logger: logging.Logger | None = None ): super().__init__( job=job, - logger=logger + logger=logging.getLogger(__name__) ) + self._connector = GSSConnector(feature_id=self._job.product_id, + workdir=self._path_to_downloaded) def has_product(self) -> bool: - if not settings.SENTINEL_ENABLE_GSS: + if not settings.ENABLE_GSS: self._logger.warning("GSS datasource is not enabled!") return False - # TODO implementovat lookup produktu na GSS - return False + if any(not value for value in settings.GSS_CREDENTIALS.values()): + self._logger.error("GSS credentials are not configured!") + return False + + return self._connector.get_feature() is not None def list_product_files(self) -> list[str]: - # TODO lookup produktů na GSS - raise NotImplementedError("GSS datasource lookup is not implemented!") + available_files: list[str] = self._connector.get_available_files() + + if not available_files: + self._logger.warning(f"No files found for product {self._job.product_id}") + else: + self._logger.info(f"Found {len(available_files)} files for product {self._job.product_id}") + + return available_files def download_product_files(self, files_to_download: list[str]) -> list[str]: - # TODO download prodkutů z GSS - raise NotImplementedError("GSS datasource download is not implemented!") + self._logger.info(f"Starting GSS download for product {self._job.product_id}") + + downloaded_files: list[str] = self._connector.download_selected_files(files_to_download=files_to_download) + + self._logger.info(f"Finished downloading {len(downloaded_files)} files for product {self._job.product_id}") + + return downloaded_files diff --git a/backend/app/infrastructure/downloaders/sentinel_1_downloader.py b/backend/app/infrastructure/downloaders/sentinel_1_downloader.py index d2bcb4b..c1241e1 100644 --- a/backend/app/infrastructure/downloaders/sentinel_1_downloader.py +++ b/backend/app/infrastructure/downloaders/sentinel_1_downloader.py @@ -20,11 +20,12 @@ def _filter_files(self, available_files: list[str] = None) -> list[str]: ) ) + self._logger.info(f"Available files: {available_files}") for file in available_files: file_strip_lower = file.strip().lower() - # only measurement TIFFs - if "/measurement/" not in file_strip_lower or not re.search(r"\.(tif|tiff)$", file_strip_lower): + # only TIFFs + if not re.search(r"\.(tif|tiff)$", file_strip_lower): continue # loose filter diff --git a/backend/app/settings/settings.py b/backend/app/settings/settings.py index 76702ae..d08d430 100644 --- a/backend/app/settings/settings.py +++ b/backend/app/settings/settings.py @@ -45,13 +45,21 @@ def ENABLED_DATASETS(self) -> List[str]: return datasets # Sentinel credentials - SENTINEL_ENABLE_GSS: bool = False SENTINEL_CDSE_CATALOG_ROOT: str = "https://catalogue.dataspace.copernicus.eu/odata/v1/" SENTINEL_CDSE_S3_ENDPOINT_URL: str = "https://eodata.dataspace.copernicus.eu/" SENTINEL_CDSE_S3_REGION_NAME: str = "default" SENTINEL_CDSE_S3_ACCESS_KEY: Optional[str] = None SENTINEL_CDSE_S3_SECRET_KEY: Optional[str] = None + ENABLE_GSS: bool = False + GSS_ODATA_CATALOG_ROOT: Optional[str] = None + GSS_STAC_CATALOG_ROOT: Optional[str] = None + GSS_CLIENT_ID: Optional[str] = None + GSS_CLIENT_SECRET: Optional[str] = None + GSS_TOKEN_URL: Optional[str] = None + GSS_USERNAME: Optional[str] = None + GSS_PASSWORD: Optional[str] = None + @computed_field @property def SENTINEL_CDSE_S3_CREDENTIALS(self) -> dict[str, Optional[str]]: @@ -62,6 +70,18 @@ def SENTINEL_CDSE_S3_CREDENTIALS(self) -> dict[str, Optional[str]]: "endpoint_url": self.SENTINEL_CDSE_S3_ENDPOINT_URL } + @computed_field + @property + def GSS_CREDENTIALS(self) -> dict[str, Optional[str]]: + return { + "odata_catalog_root": self.GSS_ODATA_CATALOG_ROOT, + "stac_catalog_root": self.GSS_STAC_CATALOG_ROOT, + "client_id": self.GSS_CLIENT_ID, + "token_url": self.GSS_TOKEN_URL, + "username": self.GSS_USERNAME, + "password": self.GSS_PASSWORD + } + # ------------------------------------------------------------------ # PATHS # ------------------------------------------------------------------ diff --git a/backend/requirements.txt b/backend/requirements.txt index 0aac07e..8a2f27a 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -9,3 +9,4 @@ pydantic-settings~=2.13.1 httpx~=0.28.1 boto3~=1.42.63 botocore~=1.42.63 +requests~=2.32.4