From cbc7c6fdd7fba3e02a9217af77daf4e4b73fd2b1 Mon Sep 17 00:00:00 2001 From: Rodos Date: Thu, 2 Jul 2026 08:32:46 +1000 Subject: [PATCH 1/3] feat(circuit_breaker): support composite primary key in DynamoDB persistence Add optional `sort_key_attr` and `static_pk_value` parameters to CircuitBreakerDynamoDBPersistence so circuit state can be stored in a single-table (composite key) design, mirroring the Idempotency utility. When `sort_key_attr` is set, the circuit name is written to the sort key and `static_pk_value` to the partition key (default `circuit_breaker#`); omit it for the current partition-key-only behavior. The composite key is threaded through the get, conditional put (half-open probe election) and update paths. Includes Stubber-based tests for both modes, docs, and an example. Closes #8315 --- .../persistence/dynamodb.py | 39 +++++- docs/utilities/circuit_breaker.md | 24 +++- .../src/working_with_composite_key.py | 31 +++++ .../test_dynamodb_persistence.py | 116 ++++++++++++++++++ 4 files changed, 204 insertions(+), 6 deletions(-) create mode 100644 examples/circuit_breaker_alpha/src/working_with_composite_key.py diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py index 386de753839..b0f00548a13 100644 --- a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py @@ -6,13 +6,14 @@ import datetime import logging +import os from typing import TYPE_CHECKING import boto3 from boto3.dynamodb.types import TypeDeserializer from botocore.exceptions import ClientError -from aws_lambda_powertools.shared import user_agent +from aws_lambda_powertools.shared import constants, user_agent from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import ( CircuitBreakerExistingLockError, CircuitBreakerPersistenceLayer, @@ -41,6 +42,13 @@ class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer): Name of the DynamoDB table that stores circuit state. key_attr : str Partition key attribute holding the circuit name. Defaults to ``"id"``. + static_pk_value : str, optional + Partition key value used when ``sort_key_attr`` is set, so the circuit name + moves to the sort key. Defaults to ``"circuit_breaker#"``. + sort_key_attr : str, optional + Sort key attribute holding the circuit name. When set, the table is treated as + composite: ``static_pk_value`` is written to the partition key and the circuit + name to the sort key. Omit it for the default partition-key-only behavior. state_attr : str Attribute holding the circuit state. Defaults to ``"state"``. failure_count_attr : str @@ -75,6 +83,8 @@ def __init__( self, table_name: str, key_attr: str = "id", + static_pk_value: str | None = None, + sort_key_attr: str | None = None, state_attr: str = "state", failure_count_attr: str = "failure_count", opened_at_attr: str = "opened_at", @@ -92,8 +102,16 @@ def __init__( user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker") + if sort_key_attr == key_attr: + raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!") + + if static_pk_value is None: + static_pk_value = f"circuit_breaker#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}" + self.table_name = table_name self.key_attr = key_attr + self.static_pk_value = static_pk_value + self.sort_key_attr = sort_key_attr self.state_attr = state_attr self.failure_count_attr = failure_count_attr self.opened_at_attr = opened_at_attr @@ -111,7 +129,7 @@ def _item_to_record(self, item: dict) -> CircuitStateRecord: opened_at = data.get(self.opened_at_attr) probe_lease_expiry = data.get(self.probe_lease_expiry_attr) return CircuitStateRecord( - name=data[self.key_attr], + name=data[self.sort_key_attr] if self.sort_key_attr else data[self.key_attr], state=CircuitState(data[self.state_attr]), failure_count=int(data.get(self.failure_count_attr, 0)), opened_at=int(opened_at) if opened_at is not None else None, @@ -130,10 +148,21 @@ def _s(value: str) -> dict: """Wrap a value as a DynamoDB string attribute.""" return {"S": value} + def _get_key(self, name: str) -> dict: + """Build the simple or composite primary key for a circuit. + + When ``sort_key_attr`` is set the key is composite: ``static_pk_value`` in the + partition and the circuit name in the sort key. Otherwise the name is the + partition key. + """ + if self.sort_key_attr: + return {self.key_attr: self._s(self.static_pk_value), self.sort_key_attr: self._s(name)} + return {self.key_attr: self._s(name)} + def _record_to_item(self, record: CircuitStateRecord) -> dict: """Translate a :class:`CircuitStateRecord` into a DynamoDB item.""" item: dict = { - self.key_attr: self._s(record.name), + **self._get_key(record.name), self.state_attr: self._s(str(record.state)), self.failure_count_attr: self._n(record.failure_count), } @@ -152,7 +181,7 @@ def _get_record(self, name: str) -> CircuitStateRecord | None: # and halves the read cost on the hot path. response = self.client.get_item( TableName=self.table_name, - Key={self.key_attr: self._s(name)}, + Key=self._get_key(name), ConsistentRead=False, ) item = response.get("Item") @@ -242,7 +271,7 @@ def _build_update_query(self, record: CircuitStateRecord) -> dict: return { "TableName": self.table_name, - "Key": {self.key_attr: self._s(record.name)}, + "Key": self._get_key(record.name), "UpdateExpression": update_expression, "ExpressionAttributeNames": expression_attr_names, "ExpressionAttributeValues": expression_attr_values, diff --git a/docs/utilities/circuit_breaker.md b/docs/utilities/circuit_breaker.md index 5087e24310b..0774238284f 100644 --- a/docs/utilities/circuit_breaker.md +++ b/docs/utilities/circuit_breaker.md @@ -91,7 +91,7 @@ Unless you're looking to [customize each attribute](#customizing-the-dynamodb-ta | Partition key | `id` | Holds the circuit name | | TTL attribute name | `expiration` | Using AWS Console? This is configurable after table creation | -You **can** use a single DynamoDB table for all your circuits. +You **can** use a single DynamoDB table for all your circuits. Already have a single-table (composite key) design you want to reuse? See [Using a composite primary key](#using-a-composite-primary-key). ##### DynamoDB IaC example @@ -218,6 +218,28 @@ Set **`POWERTOOLS_CIRCUIT_BREAKER_DISABLED`**{: .copyMe} to a truthy value to by `CircuitBreakerDynamoDBPersistence` accepts attribute-name overrides (`key_attr`, `state_attr`, `failure_count_attr`, `opened_at_attr`, `half_open_owner_attr`, `expiry_attr`) and the usual boto3 escape hatches (`boto3_session`, `boto3_client`, `boto_config`) for reusing an existing table layout or client. +#### Using a composite primary key + +Use the `sort_key_attr` parameter when your table is configured with a composite primary key _(hash+range key)_, as in a single-table design. + +When enabled, the circuit name is saved in the sort key instead, and the partition key defaults to `circuit_breaker#{LAMBDA_FUNCTION_NAME}` — this **namespaces circuits per function** so two functions can use the same circuit name without sharing state. (Without `sort_key_attr`, the default remains partition-key-only with the circuit name as the partition key.) + +Set `static_pk_value` to a fixed value (as below) when you instead want **multiple functions to share the same circuit** — for example, every function guarding the same downstream dependency should trip together. + +```python hl_lines="12-14" +--8<-- "examples/circuit_breaker_alpha/src/working_with_composite_key.py" +``` + +??? note "Click to expand and learn how table items would look like" + + The circuit name is stored in the sort key, so several circuits share one partition. Attributes that don't apply to a state are simply absent. + + | PK | SK | state | failure_count | opened_at | half_open_owner | probe_lease_expiry | expiration | + | --------------- | --------------- | --------- | ------------- | ---------- | --------------- | ------------------ | ---------- | + | CIRCUIT_BREAKER | payment-backend | OPEN | 5 | 1699999999 | | | 1700003599 | + | CIRCUIT_BREAKER | inventory-api | HALF_OPEN | 5 | 1699999999 | 9f3c1a2b-env | 1700000030 | 1700003599 | + | CIRCUIT_BREAKER | email-service | CLOSED | 0 | | | | | + ## Testing your code When unit testing the function a circuit protects, set `POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true` to bypass the circuit and persistence layer entirely, so your tests exercise the business logic without needing DynamoDB. diff --git a/examples/circuit_breaker_alpha/src/working_with_composite_key.py b/examples/circuit_breaker_alpha/src/working_with_composite_key.py new file mode 100644 index 00000000000..d2ef0ae9a17 --- /dev/null +++ b/examples/circuit_breaker_alpha/src/working_with_composite_key.py @@ -0,0 +1,31 @@ +import os + +from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import ( + CircuitBreakerDynamoDBPersistence, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +table = os.getenv("CIRCUIT_BREAKER_TABLE", "") +persistence = CircuitBreakerDynamoDBPersistence( + table_name=table, + key_attr="PK", + sort_key_attr="SK", + static_pk_value="CIRCUIT_BREAKER", +) + + +class PaymentBackend: + def charge(self, order: dict): ... + + +payment_api = PaymentBackend() + + +@circuit_breaker(name="payment-backend", persistence_store=persistence) +def charge(order: dict) -> dict: + return payment_api.charge(order) # the protected downstream call + + +def lambda_handler(event: dict, context: LambdaContext): + return charge(event) diff --git a/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py index d1617f26617..a2a62540876 100644 --- a/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py +++ b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py @@ -268,3 +268,119 @@ def test_build_half_open_condition_with_expected_opened_at(persistence): assert "#opened_at = :expected_opened_at" in result["ConditionExpression"] assert result["ExpressionAttributeNames"]["#opened_at"] == "opened_at" assert result["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "5000"} + + +# --------------------------------------------------------------------------- composite (single-table) key + + +COMPOSITE_KEY = {"PK": {"S": "CIRCUIT_BREAKER"}, "SK": {"S": "payment"}} + + +@pytest.fixture +def composite_persistence(): + client = boto3.client("dynamodb", config=Config(region_name="us-east-1")) + layer = CircuitBreakerDynamoDBPersistence( + table_name=TABLE_NAME, + boto3_client=client, + key_attr="PK", + sort_key_attr="SK", + static_pk_value="CIRCUIT_BREAKER", + ) + layer.configure(CircuitBreakerConfig(local_cache_max_age=0), circuit_name="payment") + return layer + + +def test_composite_get_state_reads_composite_key(composite_persistence): + stubber = Stubber(composite_persistence.client) + stubber.add_response( + "get_item", + {}, + {"TableName": TABLE_NAME, "Key": COMPOSITE_KEY, "ConsistentRead": False}, + ) + with stubber: + record = composite_persistence.get_state("payment") + assert record.state == CircuitState.CLOSED + + +def test_composite_save_open_writes_composite_key(composite_persistence): + captured, restore = _capture_put_item(composite_persistence) + try: + composite_persistence.save_open("payment", failure_count=5, opened_at=1000) + finally: + restore() + + item = captured["Item"] + assert item["PK"] == {"S": "CIRCUIT_BREAKER"} + assert item["SK"] == {"S": "payment"} + assert "id" not in item + assert item["state"] == {"S": "OPEN"} + + +def test_composite_try_acquire_half_open_writes_composite_key(composite_persistence): + captured, restore = _capture_put_item(composite_persistence) + try: + assert composite_persistence.try_acquire_half_open("payment", "env-a", 1000) is True + finally: + restore() + + item = captured["Item"] + assert item["PK"] == {"S": "CIRCUIT_BREAKER"} + assert item["SK"] == {"S": "payment"} + assert item["state"] == {"S": "HALF_OPEN"} + # The conditional election is still emitted in composite mode (condition is key-independent). + assert "#state = :open AND attribute_not_exists(#half_open_owner)" in captured["ConditionExpression"] + assert "#probe_lease_expiry <= :now" in captured["ConditionExpression"] + + +def test_composite_try_acquire_half_open_loses_on_conditional_failure(composite_persistence): + stubber = Stubber(composite_persistence.client) + stubber.add_client_error("put_item", service_error_code="ConditionalCheckFailedException") + with stubber: + assert composite_persistence.try_acquire_half_open("payment", "env-b", 1000) is False + + +def test_composite_save_closed_updates_composite_key(composite_persistence): + captured = {} + original_update = composite_persistence.client.update_item + + def capturing_update(**kwargs): + captured.update(kwargs) + return {} + + composite_persistence.client.update_item = capturing_update + try: + composite_persistence.save_closed("payment") + finally: + composite_persistence.client.update_item = original_update + + assert captured["Key"] == COMPOSITE_KEY + + +def test_composite_item_to_record_reads_name_from_sort_key(composite_persistence): + item = { + "PK": {"S": "CIRCUIT_BREAKER"}, + "SK": {"S": "payment"}, + "state": {"S": "OPEN"}, + "failure_count": {"N": "2"}, + } + record = composite_persistence._item_to_record(item) + assert record.name == "payment" + assert record.state == CircuitState.OPEN + + +def test_sort_key_equal_to_key_attr_raises(): + client = boto3.client("dynamodb", config=Config(region_name="us-east-1")) + with pytest.raises(ValueError, match="cannot be the same"): + CircuitBreakerDynamoDBPersistence( + table_name=TABLE_NAME, + boto3_client=client, + key_attr="PK", + sort_key_attr="PK", + ) + + +def test_default_static_pk_value_namespaces_function_name(monkeypatch): + monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "orders-fn") + client = boto3.client("dynamodb", config=Config(region_name="us-east-1")) + layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client, sort_key_attr="SK") + assert layer.static_pk_value == "circuit_breaker#orders-fn" From 49b9453cb0a0f5f34c46098ba5d517591c425e46 Mon Sep 17 00:00:00 2001 From: Rodos Date: Thu, 2 Jul 2026 17:11:36 +1000 Subject: [PATCH 2/3] fix(circuit_breaker): rename the composite-key example to a unique basename so mypy no longer clashes with the idempotency example. --- docs/utilities/circuit_breaker.md | 2 +- ...h_composite_key.py => working_with_composite_primary_key.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename examples/circuit_breaker_alpha/src/{working_with_composite_key.py => working_with_composite_primary_key.py} (100%) diff --git a/docs/utilities/circuit_breaker.md b/docs/utilities/circuit_breaker.md index 0774238284f..f83685862f0 100644 --- a/docs/utilities/circuit_breaker.md +++ b/docs/utilities/circuit_breaker.md @@ -227,7 +227,7 @@ When enabled, the circuit name is saved in the sort key instead, and the partiti Set `static_pk_value` to a fixed value (as below) when you instead want **multiple functions to share the same circuit** — for example, every function guarding the same downstream dependency should trip together. ```python hl_lines="12-14" ---8<-- "examples/circuit_breaker_alpha/src/working_with_composite_key.py" +--8<-- "examples/circuit_breaker_alpha/src/working_with_composite_primary_key.py" ``` ??? note "Click to expand and learn how table items would look like" diff --git a/examples/circuit_breaker_alpha/src/working_with_composite_key.py b/examples/circuit_breaker_alpha/src/working_with_composite_primary_key.py similarity index 100% rename from examples/circuit_breaker_alpha/src/working_with_composite_key.py rename to examples/circuit_breaker_alpha/src/working_with_composite_primary_key.py From 159cd83696011b0a0250e4ed5580c61739a46299 Mon Sep 17 00:00:00 2001 From: Rodos Date: Thu, 2 Jul 2026 17:26:09 +1000 Subject: [PATCH 3/3] feat(circuit_breaker): warn when static_pk_value is passed without sort_key_attr since it is otherwise silently ignored. --- .../circuit_breaker_alpha/persistence/dynamodb.py | 9 +++++++++ .../test_dynamodb_persistence.py | 11 +++++++++++ 2 files changed, 20 insertions(+) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py index b0f00548a13..dc557ef1218 100644 --- a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py @@ -7,6 +7,7 @@ import datetime import logging import os +import warnings from typing import TYPE_CHECKING import boto3 @@ -20,6 +21,7 @@ ) from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState +from aws_lambda_powertools.warnings import PowertoolsUserWarning if TYPE_CHECKING: from botocore.config import Config @@ -105,6 +107,13 @@ def __init__( if sort_key_attr == key_attr: raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!") + if static_pk_value is not None and sort_key_attr is None: + warnings.warn( + "static_pk_value is ignored unless sort_key_attr is also set.", + category=PowertoolsUserWarning, + stacklevel=2, + ) + if static_pk_value is None: static_pk_value = f"circuit_breaker#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}" diff --git a/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py index a2a62540876..4555c088e36 100644 --- a/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py +++ b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py @@ -10,6 +10,7 @@ CircuitBreakerDynamoDBPersistence, ) from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState +from aws_lambda_powertools.warnings import PowertoolsUserWarning TABLE_NAME = "CircuitBreakerState" @@ -384,3 +385,13 @@ def test_default_static_pk_value_namespaces_function_name(monkeypatch): client = boto3.client("dynamodb", config=Config(region_name="us-east-1")) layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client, sort_key_attr="SK") assert layer.static_pk_value == "circuit_breaker#orders-fn" + + +def test_static_pk_value_without_sort_key_attr_warns(): + client = boto3.client("dynamodb", config=Config(region_name="us-east-1")) + with pytest.warns(PowertoolsUserWarning, match="static_pk_value is ignored unless sort_key_attr"): + CircuitBreakerDynamoDBPersistence( + table_name=TABLE_NAME, + boto3_client=client, + static_pk_value="CIRCUIT_BREAKER", + )