diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index f81966c4..847b3a9d 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -28,6 +28,7 @@ get_stats_date_range, get_stats_por, get_time_series_metadata, + get_waterdata, ) from .filters import FILTER_LANG from .nearest import get_nearest_continuous @@ -64,4 +65,5 @@ "get_stats_date_range", "get_stats_por", "get_time_series_metadata", + "get_waterdata", ] diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..e4002c68 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -24,12 +24,20 @@ METADATA_COLLECTIONS, PROFILES, SERVICES, + WATERDATA_SERVICES, ) from dataretrieval.waterdata.utils import ( + _OUTPUT_ID_BY_SERVICE, + GEOPANDAS, SAMPLES_URL, _check_profiles, + _construct_cql_request, _default_headers, + _finalize_ogc_frame, _get_args, + _normalize_str_iterable, + _switch_properties_id, + _walk_pages, get_ogc_data, get_stats_data, ) @@ -2832,3 +2840,140 @@ def get_channel( args = _get_args(locals()) return get_ogc_data(args, output_id, service) + + +def get_waterdata( + service: WATERDATA_SERVICES, + cql: str | dict, + *, + properties: str | Iterable[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, + convert_type: bool = True, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Generalized OGC API CQL2 query. + + Python analogue of R ``dataRetrieval::read_waterdata``. Use this + when you need a predicate the typed wrappers (``get_daily``, + ``get_continuous``, …) can't express — top-level ``or``, ``like`` + with ``%`` wildcards, comparison operators, nested boolean trees, + geometry-based predicates beyond a bbox, and so on. The typed + wrappers are nicer when they cover the case; reach for this when + they don't. + + The CQL2 grammar is documented at + https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/. + + Parameters + ---------- + service : str + OGC collection name. Must be one of + :data:`dataretrieval.waterdata.types.WATERDATA_SERVICES` + (e.g. ``"daily"``, ``"monitoring-locations"``). + cql : str or dict + CQL2 query. A ``dict`` is JSON-serialized for transport; a + ``str`` is sent through unchanged. The query goes into the + HTTP POST body with ``Content-Type: + application/query-cql-json``. + properties : str or iterable of str, optional + Server-side property whitelist (passed as ``properties=`` on + the URL). Reduces payload size and bypasses the response-shape + post-processing for any column not listed. ``"id"`` resolves + to the service's ``output_id`` (e.g. ``daily_id``) the same way + it does in the typed wrappers. + bbox : list of float, optional + Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines + with the CQL filter as an additional spatial predicate. + limit : int, optional + Page size, clamped server-side to 50,000. + skip_geometry : bool, optional + If True, the server omits geometry from each feature + (``skipGeometry=true``). + convert_type : bool, default True + Coerce date/datetime/numeric columns to typed dtypes after the + DataFrame is built. + + Returns + ------- + df : pandas.DataFrame or geopandas.GeoDataFrame + Result of the query. GeoDataFrame when ``geopandas`` is + installed and geometry is present. + md : :class:`dataretrieval.utils.BaseMetadata` + Request metadata (URL, query time, response headers). + + Examples + -------- + .. code:: + + >>> # Daily values for two parameter codes at two sites + >>> # (compound AND-of-INs). + >>> from dataretrieval import waterdata + >>> cql = { + ... "op": "and", + ... "args": [ + ... { + ... "op": "in", + ... "args": [ + ... {"property": "parameter_code"}, + ... ["00060", "00065"], + ... ], + ... }, + ... { + ... "op": "in", + ... "args": [ + ... {"property": "monitoring_location_id"}, + ... ["USGS-07367300", "USGS-03277200"], + ... ], + ... }, + ... ], + ... } + >>> df, md = waterdata.get_waterdata(service="daily", cql=cql) + + >>> # Monitoring locations whose HUC starts with "02070010" + >>> # (LIKE with the CQL2 ``%`` wildcard). + >>> df, md = waterdata.get_waterdata( + ... service="monitoring-locations", + ... cql='{"op": "like", "args": [' + ... '{"property": "hydrologic_unit_code"},' + ... ' "02070010%"]}', + ... ) + """ + if service not in _OUTPUT_ID_BY_SERVICE: + raise ValueError( + f"Unknown service {service!r}. Valid services: " + f"{sorted(_OUTPUT_ID_BY_SERVICE)}." + ) + output_id = _OUTPUT_ID_BY_SERVICE[service] + + # ``dict`` is the pythonic input — serialize on the way out. ``str`` + # is sent verbatim so callers who already have a CQL2 doc (e.g. + # imported from a config file) don't need to re-parse it. + body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql + + if properties is None: + properties_list = None + elif isinstance(properties, str): + properties_list = [properties] + else: + properties_list = _normalize_str_iterable(properties, "properties") + + # Translate user-facing names (``daily_id``) to the wire-format + # ``id`` the OGC API expects, matching the typed wrappers. + wire_properties = _switch_properties_id( + properties_list, id_name=output_id, service=service + ) + + req = _construct_cql_request( + service=service, + cql_body=body, + properties=wire_properties or None, + bbox=bbox, + limit=limit, + skip_geometry=skip_geometry, + ) + + df, response = _walk_pages(geopd=GEOPANDAS, req=req) + return _finalize_ogc_frame( + df, response, properties_list, service, output_id, convert_type + ) diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..c22930e4 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,6 +40,26 @@ "results", ] +# OGC API collection names that the typed waterdata getters cover. +# Used as the ``service`` arg type of :func:`get_waterdata`. Keep in +# sync with ``_OUTPUT_ID_BY_SERVICE`` in +# :mod:`dataretrieval.waterdata.utils` — the dict is the source of +# truth at runtime; this Literal exists so editors can offer +# completion and type-checkers can catch typos at call sites. +WATERDATA_SERVICES = Literal[ + "channel-measurements", + "combined-metadata", + "continuous", + "daily", + "field-measurements", + "field-measurements-metadata", + "latest-continuous", + "latest-daily", + "monitoring-locations", + "peaks", + "time-series-metadata", +] + PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 91228357..b4f90fc5 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -157,6 +157,26 @@ def _switch_properties_id(properties: list[str] | None, id_name: str, service: s # parameters and require POST with CQL2 JSON instead. _CQL2_REQUIRED_SERVICES = frozenset({"monitoring-locations"}) +# Service name → the column the rest of the package exposes the +# per-record OGC ``id`` under. Used by the generalized +# :func:`get_waterdata` entry point, which doesn't have a per-service +# wrapper to hard-code this in. Values here match what each typed +# ``get_*`` function in :mod:`dataretrieval.waterdata.api` uses for +# its own ``output_id``. +_OUTPUT_ID_BY_SERVICE: dict[str, str] = { + "daily": "daily_id", + "continuous": "continuous_id", + "latest-continuous": "latest_continuous_id", + "latest-daily": "latest_daily_id", + "field-measurements": "field_measurement_id", + "field-measurements-metadata": "field_series_id", + "monitoring-locations": "monitoring_location_id", + "time-series-metadata": "time_series_id", + "combined-metadata": "combined_meta_id", + "peaks": "peak_id", + "channel-measurements": "channel_measurements_id", +} + def _parse_datetime(value: str) -> datetime | None: """Parse a single datetime string against the supported formats. @@ -492,14 +512,7 @@ def _construct_api_requests( for k, v in kwargs.items() } - params["skipGeometry"] = skip_geometry - params["limit"] = 50000 if limit is None or limit > 50000 else limit - - # `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`. - if bbox is not None and len(bbox) > 0: - params["bbox"] = ",".join(map(str, bbox)) - if properties: - params["properties"] = ",".join(properties) + params.update(_ogc_query_params(properties, bbox, limit, skip_geometry)) # Translate CQL filter Python names to the hyphenated URL parameter that # the OGC API expects. The Python kwarg is `filter_lang` because hyphens @@ -528,6 +541,55 @@ def _construct_api_requests( return request.prepare() +def _ogc_query_params( + properties: list[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, +) -> dict[str, Any]: + """The ``skipGeometry``/``limit``/``bbox``/``properties`` block of + an OGC URL, used by both :func:`_construct_api_requests` and + :func:`_construct_cql_request`.""" + params: dict[str, Any] = { + "skipGeometry": skip_geometry, + "limit": 50000 if limit is None or limit > 50000 else limit, + } + if bbox is not None and len(bbox) > 0: + params["bbox"] = ",".join(map(str, bbox)) + if properties: + params["properties"] = ",".join(properties) + return params + + +def _construct_cql_request( + service: str, + cql_body: str, + properties: list[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, +) -> requests.PreparedRequest: + """Build a POST/CQL2 request for the generalized ``get_waterdata`` path. + + Distinct from :func:`_construct_api_requests`: that function derives + the CQL2 body from typed kwargs and also handles the GET-with-comma- + separated-values path. Here the body is passed through verbatim so + a caller can express predicates the typed wrappers can't (top-level + ``or``, ``like`` with ``%`` wildcards, comparison operators, …). + """ + service_url = f"{OGC_API_URL}/collections/{service}/items" + headers = _default_headers() + headers["Content-Type"] = "application/query-cql-json" + request = requests.Request( + method="POST", + url=service_url, + headers=headers, + data=cql_body, + params=_ogc_query_params(properties, bbox, limit, skip_geometry), + ) + return request.prepare() + + def _next_req_url(resp: requests.Response) -> str | None: """ Extracts the URL for the next page of results from an HTTP response from a @@ -868,6 +930,29 @@ def _sort_rows(df: pd.DataFrame) -> pd.DataFrame: return df +def _finalize_ogc_frame( + df: pd.DataFrame, + response: requests.Response, + properties: list[str] | None, + service: str, + output_id: str, + convert_type: bool, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Apply the standard OGC post-processing tail to a raw frame and + wrap the response in :class:`BaseMetadata`. + + Shared by :func:`get_ogc_data` (typed-kwargs path) and + :func:`dataretrieval.waterdata.api.get_waterdata` (raw-CQL2 path) + so both surfaces produce identically-shaped DataFrames. + """ + df = _deal_with_empty(df, properties, service) + if convert_type: + df = _type_cols(df) + df = _arrange_cols(df, properties, output_id) + df = _sort_rows(df) + return df, BaseMetadata(response) + + def get_ogc_data( args: dict[str, Any], output_id: str, service: str ) -> tuple[pd.DataFrame, BaseMetadata]: @@ -914,13 +999,9 @@ def get_ogc_data( args = {k: v for k, v in args.items() if v is not None} return_list, response = _fetch_once(args) - return_list = _deal_with_empty(return_list, properties, service) - if convert_type: - return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) - return_list = _sort_rows(return_list) - - return return_list, BaseMetadata(response) + return _finalize_ogc_frame( + return_list, response, properties, service, output_id, convert_type + ) @filters.chunked(build_request=_construct_api_requests)