From e7943badde91fb59a8b683c989523febb6ba5a87 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Mon, 12 May 2025 11:43:24 +0100 Subject: [PATCH 1/6] Dynamic queryables mapping for properties. --- .../stac_fastapi/core/extensions/filter.py | 50 +++++++++++-------- .../elasticsearch/database_logic.py | 33 ++++++++++-- 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py index a74eff99..2ad6c602 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py @@ -91,20 +91,20 @@ class SpatialOp(str, Enum): S_DISJOINT = "s_disjoint" -queryables_mapping = { - "id": "id", - "collection": "collection", - "geometry": "geometry", - "datetime": "properties.datetime", - "created": "properties.created", - "updated": "properties.updated", - "cloud_cover": "properties.eo:cloud_cover", - "cloud_shadow_percentage": "properties.s2:cloud_shadow_percentage", - "nodata_pixel_percentage": "properties.s2:nodata_pixel_percentage", -} - - -def to_es_field(field: str) -> str: +# queryables_mapping = { +# "id": "id", +# "collection": "collection", +# "geometry": "geometry", +# "datetime": "properties.datetime", +# "created": "properties.created", +# "updated": "properties.updated", +# "cloud_cover": "properties.eo:cloud_cover", +# "cloud_shadow_percentage": "properties.s2:cloud_shadow_percentage", +# "nodata_pixel_percentage": "properties.s2:nodata_pixel_percentage", +# } + + +def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: """ Map a given field to its corresponding Elasticsearch field according to a predefined mapping. @@ -117,7 +117,7 @@ def to_es_field(field: str) -> str: return queryables_mapping.get(field, field) -def to_es(query: Dict[str, Any]) -> Dict[str, Any]: +def to_es(queryables_mapping: Dict[str, Any], query: Dict[str, Any]) -> Dict[str, Any]: """ Transform a simplified CQL2 query structure to an Elasticsearch compatible query DSL. @@ -133,7 +133,13 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: LogicalOp.OR: "should", LogicalOp.NOT: "must_not", }[query["op"]] - return {"bool": {bool_type: [to_es(sub_query) for sub_query in query["args"]]}} + return { + "bool": { + bool_type: [ + to_es(queryables_mapping, sub_query) for sub_query in query["args"] + ] + } + } elif query["op"] in [ ComparisonOp.EQ, @@ -150,7 +156,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: ComparisonOp.GTE: "gte", } - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) value = query["args"][1] if isinstance(value, dict) and "timestamp" in value: value = value["timestamp"] @@ -173,11 +179,11 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: return {"range": {field: {range_op[query["op"]]: value}}} elif query["op"] == ComparisonOp.IS_NULL: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) return {"bool": {"must_not": {"exists": {"field": field}}}} elif query["op"] == AdvancedComparisonOp.BETWEEN: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) gte, lte = query["args"][1], query["args"][2] if isinstance(gte, dict) and "timestamp" in gte: gte = gte["timestamp"] @@ -186,14 +192,14 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: return {"range": {field: {"gte": gte, "lte": lte}}} elif query["op"] == AdvancedComparisonOp.IN: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) values = query["args"][1] if not isinstance(values, list): raise ValueError(f"Arg {values} is not a list") return {"terms": {field: values}} elif query["op"] == AdvancedComparisonOp.LIKE: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) pattern = cql2_like_to_es(query["args"][1]) return {"wildcard": {field: {"value": pattern, "case_insensitive": True}}} @@ -203,7 +209,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: SpatialOp.S_WITHIN, SpatialOp.S_DISJOINT, ]: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) geometry = query["args"][1] relation_mapping = { diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 7afbb58d..486eabf7 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -290,6 +290,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) return item["_source"] + async def get_queryables_mapping(self, collection_id: str = "*") -> dict: + """Retrieve mapping of Queryables for search. + + Args: + collection_id (str, optional): The id of the Collection the Queryables + belongs to. Defaults to "*". + + Returns: + dict: A dictionary containing the Queryables mappings. + """ + queryables_mapping = {} + + mappings = await self.client.indices.get_mapping( + index=f"{ITEMS_INDEX_PREFIX}{collection_id}", + ) + + for mapping in mappings.values(): + fields = mapping["mappings"]["properties"] + properties = fields.pop("properties") + + for field_key in fields: + queryables_mapping[field_key] = field_key + + for property_key in properties["properties"]: + queryables_mapping[property_key] = f"properties.{property_key}" + + return queryables_mapping + @staticmethod def make_search(): """Database logic to create a Search instance.""" @@ -518,8 +546,7 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str] return search - @staticmethod - def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + def apply_cql2_filter(self, search: Search, _filter: Optional[Dict[str, Any]]): """ Apply a CQL2 filter to an Elasticsearch Search object. @@ -539,7 +566,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): otherwise the original Search object. """ if _filter is not None: - es_query = filter.to_es(_filter) + es_query = filter.to_es(self.get_queryables_mapping(), _filter) search = search.query(es_query) return search From c8123be1a43ec83d337f34ae72ebbe88177215ff Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Tue, 13 May 2025 11:05:17 +0100 Subject: [PATCH 2/6] Adding missing async-ness. --- stac_fastapi/core/stac_fastapi/core/core.py | 2 +- .../stac_fastapi/elasticsearch/database_logic.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 987acdf6..05212f5b 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -607,7 +607,7 @@ async def post_search( if hasattr(search_request, "filter_expr"): cql2_filter = getattr(search_request, "filter_expr", None) try: - search = self.database.apply_cql2_filter(search, cql2_filter) + search = await self.database.apply_cql2_filter(search, cql2_filter) except Exception as e: raise HTTPException( status_code=400, detail=f"Error with cql2_json filter: {e}" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 486eabf7..958ee597 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -307,13 +307,13 @@ async def get_queryables_mapping(self, collection_id: str = "*") -> dict: ) for mapping in mappings.values(): - fields = mapping["mappings"]["properties"] - properties = fields.pop("properties") + fields = mapping["mappings"].get("properties", {}) + properties = fields.pop("properties", {}).get("properties", {}).keys() for field_key in fields: queryables_mapping[field_key] = field_key - for property_key in properties["properties"]: + for property_key in properties: queryables_mapping[property_key] = f"properties.{property_key}" return queryables_mapping @@ -546,7 +546,9 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str] return search - def apply_cql2_filter(self, search: Search, _filter: Optional[Dict[str, Any]]): + async def apply_cql2_filter( + self, search: Search, _filter: Optional[Dict[str, Any]] + ): """ Apply a CQL2 filter to an Elasticsearch Search object. @@ -566,7 +568,7 @@ def apply_cql2_filter(self, search: Search, _filter: Optional[Dict[str, Any]]): otherwise the original Search object. """ if _filter is not None: - es_query = filter.to_es(self.get_queryables_mapping(), _filter) + es_query = filter.to_es(await self.get_queryables_mapping(), _filter) search = search.query(es_query) return search From c92a234091ee1c62c9996b25a9017d2b123e5d03 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Tue, 13 May 2025 11:08:46 +0100 Subject: [PATCH 3/6] Adding to opensearch. --- .../stac_fastapi/opensearch/database_logic.py | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 5b9510f3..71ab9275 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -307,6 +307,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) return item["_source"] + async def get_queryables_mapping(self, collection_id: str = "*") -> dict: + """Retrieve mapping of Queryables for search. + + Args: + collection_id (str, optional): The id of the Collection the Queryables + belongs to. Defaults to "*". + + Returns: + dict: A dictionary containing the Queryables mappings. + """ + queryables_mapping = {} + + mappings = await self.client.indices.get_mapping( + index=f"{ITEMS_INDEX_PREFIX}{collection_id}", + ) + + for mapping in mappings.values(): + fields = mapping["mappings"].get("properties", {}) + properties = fields.pop("properties", {}).get("properties", {}).keys() + + for field_key in fields: + queryables_mapping[field_key] = field_key + + for property_key in properties: + queryables_mapping[property_key] = f"properties.{property_key}" + + return queryables_mapping + @staticmethod def make_search(): """Database logic to create a Search instance.""" @@ -535,8 +563,9 @@ def apply_stacql_filter(search: Search, op: str, field: str, value: float): return search - @staticmethod - def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + async def apply_cql2_filter( + self, search: Search, _filter: Optional[Dict[str, Any]] + ): """ Apply a CQL2 filter to an Opensearch Search object. @@ -556,7 +585,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): otherwise the original Search object. """ if _filter is not None: - es_query = filter.to_es(_filter) + es_query = filter.to_es(await self.get_queryables_mapping(), _filter) search = search.filter(es_query) return search From 8f88a82f295cc925b23fd4bf7cb18f92ff890571 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Tue, 13 May 2025 11:31:28 +0100 Subject: [PATCH 4/6] missing await for aggregations. --- stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py index 43bd543c..d41d763c 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py @@ -467,7 +467,7 @@ async def aggregate( if aggregate_request.filter_expr: try: - search = self.database.apply_cql2_filter( + search = await self.database.apply_cql2_filter( search, aggregate_request.filter_expr ) except Exception as e: From 0a9d855e762dbbaef57982894838f95bc107220d Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Tue, 13 May 2025 11:32:30 +0100 Subject: [PATCH 5/6] CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c521dfd..2d550c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added dynamic queryables mapping for search and aggregations [#375](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/375) - Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352) - Added support for `S_CONTAINS`, `S_WITHIN`, `S_DISJOINT` spatial filter operations [#371](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/371) - Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) From 7efa98ff4b9c160709f6cd765f4b643acf9838a0 Mon Sep 17 00:00:00 2001 From: rhysrevans3 Date: Tue, 13 May 2025 11:37:33 +0100 Subject: [PATCH 6/6] Update broken test. --- .../core/stac_fastapi/core/extensions/filter.py | 13 ------------- stac_fastapi/tests/extensions/test_filter.py | 4 ++-- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py index 2ad6c602..078e7fbf 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py @@ -91,19 +91,6 @@ class SpatialOp(str, Enum): S_DISJOINT = "s_disjoint" -# queryables_mapping = { -# "id": "id", -# "collection": "collection", -# "geometry": "geometry", -# "datetime": "properties.datetime", -# "created": "properties.created", -# "updated": "properties.updated", -# "cloud_cover": "properties.eo:cloud_cover", -# "cloud_shadow_percentage": "properties.s2:cloud_shadow_percentage", -# "nodata_pixel_percentage": "properties.s2:nodata_pixel_percentage", -# } - - def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: """ Map a given field to its corresponding Elasticsearch field according to a predefined mapping. diff --git a/stac_fastapi/tests/extensions/test_filter.py b/stac_fastapi/tests/extensions/test_filter.py index ae355c3a..fb6bc850 100644 --- a/stac_fastapi/tests/extensions/test_filter.py +++ b/stac_fastapi/tests/extensions/test_filter.py @@ -163,7 +163,7 @@ async def test_search_filter_ext_and_get_cql2text_id(app_client, ctx): async def test_search_filter_ext_and_get_cql2text_cloud_cover(app_client, ctx): collection = ctx.item["collection"] cloud_cover = ctx.item["properties"]["eo:cloud_cover"] - filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'" resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") assert resp.status_code == 200 @@ -176,7 +176,7 @@ async def test_search_filter_ext_and_get_cql2text_cloud_cover_no_results( ): collection = ctx.item["collection"] cloud_cover = ctx.item["properties"]["eo:cloud_cover"] + 1 - filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'" resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") assert resp.status_code == 200