Skip to content

Dynamic queryables mapping for properties #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Added dynamic queryables mapping for search and aggregations [#375](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/375)
- Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352)
- Added support for `S_CONTAINS`, `S_WITHIN`, `S_DISJOINT` spatial filter operations [#371](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/371)
- Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370)
Expand Down
2 changes: 1 addition & 1 deletion stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ async def post_search(
if hasattr(search_request, "filter_expr"):
cql2_filter = getattr(search_request, "filter_expr", None)
try:
search = self.database.apply_cql2_filter(search, cql2_filter)
search = await self.database.apply_cql2_filter(search, cql2_filter)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Error with cql2_json filter: {e}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async def aggregate(

if aggregate_request.filter_expr:
try:
search = self.database.apply_cql2_filter(
search = await self.database.apply_cql2_filter(
search, aggregate_request.filter_expr
)
except Exception as e:
Expand Down
37 changes: 15 additions & 22 deletions stac_fastapi/core/stac_fastapi/core/extensions/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,7 @@ class SpatialOp(str, Enum):
S_DISJOINT = "s_disjoint"


queryables_mapping = {
"id": "id",
"collection": "collection",
"geometry": "geometry",
"datetime": "properties.datetime",
"created": "properties.created",
"updated": "properties.updated",
"cloud_cover": "properties.eo:cloud_cover",
"cloud_shadow_percentage": "properties.s2:cloud_shadow_percentage",
"nodata_pixel_percentage": "properties.s2:nodata_pixel_percentage",
}


def to_es_field(field: str) -> str:
def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str:
"""
Map a given field to its corresponding Elasticsearch field according to a predefined mapping.

Expand All @@ -117,7 +104,7 @@ def to_es_field(field: str) -> str:
return queryables_mapping.get(field, field)


def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
def to_es(queryables_mapping: Dict[str, Any], query: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform a simplified CQL2 query structure to an Elasticsearch compatible query DSL.

Expand All @@ -133,7 +120,13 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
LogicalOp.OR: "should",
LogicalOp.NOT: "must_not",
}[query["op"]]
return {"bool": {bool_type: [to_es(sub_query) for sub_query in query["args"]]}}
return {
"bool": {
bool_type: [
to_es(queryables_mapping, sub_query) for sub_query in query["args"]
]
}
}

elif query["op"] in [
ComparisonOp.EQ,
Expand All @@ -150,7 +143,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
ComparisonOp.GTE: "gte",
}

field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
value = query["args"][1]
if isinstance(value, dict) and "timestamp" in value:
value = value["timestamp"]
Expand All @@ -173,11 +166,11 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
return {"range": {field: {range_op[query["op"]]: value}}}

elif query["op"] == ComparisonOp.IS_NULL:
field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
return {"bool": {"must_not": {"exists": {"field": field}}}}

elif query["op"] == AdvancedComparisonOp.BETWEEN:
field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
gte, lte = query["args"][1], query["args"][2]
if isinstance(gte, dict) and "timestamp" in gte:
gte = gte["timestamp"]
Expand All @@ -186,14 +179,14 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
return {"range": {field: {"gte": gte, "lte": lte}}}

elif query["op"] == AdvancedComparisonOp.IN:
field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
values = query["args"][1]
if not isinstance(values, list):
raise ValueError(f"Arg {values} is not a list")
return {"terms": {field: values}}

elif query["op"] == AdvancedComparisonOp.LIKE:
field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
pattern = cql2_like_to_es(query["args"][1])
return {"wildcard": {field: {"value": pattern, "case_insensitive": True}}}

Expand All @@ -203,7 +196,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]:
SpatialOp.S_WITHIN,
SpatialOp.S_DISJOINT,
]:
field = to_es_field(query["args"][0]["property"])
field = to_es_field(queryables_mapping, query["args"][0]["property"])
geometry = query["args"][1]

relation_mapping = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
)
return item["_source"]

async def get_queryables_mapping(self, collection_id: str = "*") -> dict:
"""Retrieve mapping of Queryables for search.

Args:
collection_id (str, optional): The id of the Collection the Queryables
belongs to. Defaults to "*".

Returns:
dict: A dictionary containing the Queryables mappings.
"""
queryables_mapping = {}

mappings = await self.client.indices.get_mapping(
index=f"{ITEMS_INDEX_PREFIX}{collection_id}",
)

for mapping in mappings.values():
fields = mapping["mappings"].get("properties", {})
properties = fields.pop("properties", {}).get("properties", {}).keys()

for field_key in fields:
queryables_mapping[field_key] = field_key

for property_key in properties:
queryables_mapping[property_key] = f"properties.{property_key}"

return queryables_mapping

@staticmethod
def make_search():
"""Database logic to create a Search instance."""
Expand Down Expand Up @@ -518,8 +546,9 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str]

return search

@staticmethod
def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]):
async def apply_cql2_filter(
self, search: Search, _filter: Optional[Dict[str, Any]]
):
"""
Apply a CQL2 filter to an Elasticsearch Search object.

Expand All @@ -539,7 +568,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]):
otherwise the original Search object.
"""
if _filter is not None:
es_query = filter.to_es(_filter)
es_query = filter.to_es(await self.get_queryables_mapping(), _filter)
search = search.query(es_query)

return search
Expand Down
35 changes: 32 additions & 3 deletions stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict:
)
return item["_source"]

async def get_queryables_mapping(self, collection_id: str = "*") -> dict:
"""Retrieve mapping of Queryables for search.

Args:
collection_id (str, optional): The id of the Collection the Queryables
belongs to. Defaults to "*".

Returns:
dict: A dictionary containing the Queryables mappings.
"""
queryables_mapping = {}

mappings = await self.client.indices.get_mapping(
index=f"{ITEMS_INDEX_PREFIX}{collection_id}",
)

for mapping in mappings.values():
fields = mapping["mappings"].get("properties", {})
properties = fields.pop("properties", {}).get("properties", {}).keys()

for field_key in fields:
queryables_mapping[field_key] = field_key

for property_key in properties:
queryables_mapping[property_key] = f"properties.{property_key}"

return queryables_mapping

@staticmethod
def make_search():
"""Database logic to create a Search instance."""
Expand Down Expand Up @@ -535,8 +563,9 @@ def apply_stacql_filter(search: Search, op: str, field: str, value: float):

return search

@staticmethod
def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]):
async def apply_cql2_filter(
self, search: Search, _filter: Optional[Dict[str, Any]]
):
"""
Apply a CQL2 filter to an Opensearch Search object.

Expand All @@ -556,7 +585,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]):
otherwise the original Search object.
"""
if _filter is not None:
es_query = filter.to_es(_filter)
es_query = filter.to_es(await self.get_queryables_mapping(), _filter)
search = search.filter(es_query)

return search
Expand Down
4 changes: 2 additions & 2 deletions stac_fastapi/tests/extensions/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async def test_search_filter_ext_and_get_cql2text_id(app_client, ctx):
async def test_search_filter_ext_and_get_cql2text_cloud_cover(app_client, ctx):
collection = ctx.item["collection"]
cloud_cover = ctx.item["properties"]["eo:cloud_cover"]
filter = f"cloud_cover={cloud_cover} AND collection='{collection}'"
filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'"
resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}")

assert resp.status_code == 200
Expand All @@ -176,7 +176,7 @@ async def test_search_filter_ext_and_get_cql2text_cloud_cover_no_results(
):
collection = ctx.item["collection"]
cloud_cover = ctx.item["properties"]["eo:cloud_cover"] + 1
filter = f"cloud_cover={cloud_cover} AND collection='{collection}'"
filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'"
resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}")

assert resp.status_code == 200
Expand Down