Skip to content

Commit cb6f906

Browse files
feat(circuit_breaker): support composite primary key in DynamoDB persistence (#8316)
* feat(circuit_breaker): support composite primary key in DynamoDB persistence Add optional `sort_key_attr` and `static_pk_value` parameters to CircuitBreakerDynamoDBPersistence so circuit state can be stored in a single-table (composite key) design, mirroring the Idempotency utility. When `sort_key_attr` is set, the circuit name is written to the sort key and `static_pk_value` to the partition key (default `circuit_breaker#<function-name>`); omit it for the current partition-key-only behavior. The composite key is threaded through the get, conditional put (half-open probe election) and update paths. Includes Stubber-based tests for both modes, docs, and an example. Closes #8315 * fix(circuit_breaker): rename the composite-key example to a unique basename so mypy no longer clashes with the idempotency example. * feat(circuit_breaker): warn when static_pk_value is passed without sort_key_attr since it is otherwise silently ignored. --------- Co-authored-by: Leandro Damascena <lcdama@amazon.pt>
1 parent 5535469 commit cb6f906

4 files changed

Lines changed: 224 additions & 6 deletions

File tree

aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,22 @@
66

77
import datetime
88
import logging
9+
import os
10+
import warnings
911
from typing import TYPE_CHECKING
1012

1113
import boto3
1214
from boto3.dynamodb.types import TypeDeserializer
1315
from botocore.exceptions import ClientError
1416

15-
from aws_lambda_powertools.shared import user_agent
17+
from aws_lambda_powertools.shared import constants, user_agent
1618
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
1719
CircuitBreakerExistingLockError,
1820
CircuitBreakerPersistenceLayer,
1921
)
2022
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
2123
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
24+
from aws_lambda_powertools.warnings import PowertoolsUserWarning
2225

2326
if TYPE_CHECKING:
2427
from botocore.config import Config
@@ -41,6 +44,13 @@ class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer):
4144
Name of the DynamoDB table that stores circuit state.
4245
key_attr : str
4346
Partition key attribute holding the circuit name. Defaults to ``"id"``.
47+
static_pk_value : str, optional
48+
Partition key value used when ``sort_key_attr`` is set, so the circuit name
49+
moves to the sort key. Defaults to ``"circuit_breaker#<function-name>"``.
50+
sort_key_attr : str, optional
51+
Sort key attribute holding the circuit name. When set, the table is treated as
52+
composite: ``static_pk_value`` is written to the partition key and the circuit
53+
name to the sort key. Omit it for the default partition-key-only behavior.
4454
state_attr : str
4555
Attribute holding the circuit state. Defaults to ``"state"``.
4656
failure_count_attr : str
@@ -75,6 +85,8 @@ def __init__(
7585
self,
7686
table_name: str,
7787
key_attr: str = "id",
88+
static_pk_value: str | None = None,
89+
sort_key_attr: str | None = None,
7890
state_attr: str = "state",
7991
failure_count_attr: str = "failure_count",
8092
opened_at_attr: str = "opened_at",
@@ -92,8 +104,23 @@ def __init__(
92104

93105
user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker")
94106

107+
if sort_key_attr == key_attr:
108+
raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")
109+
110+
if static_pk_value is not None and sort_key_attr is None:
111+
warnings.warn(
112+
"static_pk_value is ignored unless sort_key_attr is also set.",
113+
category=PowertoolsUserWarning,
114+
stacklevel=2,
115+
)
116+
117+
if static_pk_value is None:
118+
static_pk_value = f"circuit_breaker#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"
119+
95120
self.table_name = table_name
96121
self.key_attr = key_attr
122+
self.static_pk_value = static_pk_value
123+
self.sort_key_attr = sort_key_attr
97124
self.state_attr = state_attr
98125
self.failure_count_attr = failure_count_attr
99126
self.opened_at_attr = opened_at_attr
@@ -111,7 +138,7 @@ def _item_to_record(self, item: dict) -> CircuitStateRecord:
111138
opened_at = data.get(self.opened_at_attr)
112139
probe_lease_expiry = data.get(self.probe_lease_expiry_attr)
113140
return CircuitStateRecord(
114-
name=data[self.key_attr],
141+
name=data[self.sort_key_attr] if self.sort_key_attr else data[self.key_attr],
115142
state=CircuitState(data[self.state_attr]),
116143
failure_count=int(data.get(self.failure_count_attr, 0)),
117144
opened_at=int(opened_at) if opened_at is not None else None,
@@ -130,10 +157,21 @@ def _s(value: str) -> dict:
130157
"""Wrap a value as a DynamoDB string attribute."""
131158
return {"S": value}
132159

160+
def _get_key(self, name: str) -> dict:
161+
"""Build the simple or composite primary key for a circuit.
162+
163+
When ``sort_key_attr`` is set the key is composite: ``static_pk_value`` in the
164+
partition and the circuit name in the sort key. Otherwise the name is the
165+
partition key.
166+
"""
167+
if self.sort_key_attr:
168+
return {self.key_attr: self._s(self.static_pk_value), self.sort_key_attr: self._s(name)}
169+
return {self.key_attr: self._s(name)}
170+
133171
def _record_to_item(self, record: CircuitStateRecord) -> dict:
134172
"""Translate a :class:`CircuitStateRecord` into a DynamoDB item."""
135173
item: dict = {
136-
self.key_attr: self._s(record.name),
174+
**self._get_key(record.name),
137175
self.state_attr: self._s(str(record.state)),
138176
self.failure_count_attr: self._n(record.failure_count),
139177
}
@@ -152,7 +190,7 @@ def _get_record(self, name: str) -> CircuitStateRecord | None:
152190
# and halves the read cost on the hot path.
153191
response = self.client.get_item(
154192
TableName=self.table_name,
155-
Key={self.key_attr: self._s(name)},
193+
Key=self._get_key(name),
156194
ConsistentRead=False,
157195
)
158196
item = response.get("Item")
@@ -242,7 +280,7 @@ def _build_update_query(self, record: CircuitStateRecord) -> dict:
242280

243281
return {
244282
"TableName": self.table_name,
245-
"Key": {self.key_attr: self._s(record.name)},
283+
"Key": self._get_key(record.name),
246284
"UpdateExpression": update_expression,
247285
"ExpressionAttributeNames": expression_attr_names,
248286
"ExpressionAttributeValues": expression_attr_values,

docs/utilities/circuit_breaker.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ Unless you're looking to [customize each attribute](#customizing-the-dynamodb-ta
9191
| Partition key | `id` | Holds the circuit name |
9292
| TTL attribute name | `expiration` | Using AWS Console? This is configurable after table creation |
9393

94-
You **can** use a single DynamoDB table for all your circuits.
94+
You **can** use a single DynamoDB table for all your circuits. Already have a single-table (composite key) design you want to reuse? See [Using a composite primary key](#using-a-composite-primary-key).
9595

9696
##### DynamoDB IaC example
9797

@@ -218,6 +218,28 @@ Set **`POWERTOOLS_CIRCUIT_BREAKER_DISABLED`**{: .copyMe} to a truthy value to by
218218

219219
`CircuitBreakerDynamoDBPersistence` accepts attribute-name overrides (`key_attr`, `state_attr`, `failure_count_attr`, `opened_at_attr`, `half_open_owner_attr`, `expiry_attr`) and the usual boto3 escape hatches (`boto3_session`, `boto3_client`, `boto_config`) for reusing an existing table layout or client.
220220

221+
#### Using a composite primary key
222+
223+
Use the `sort_key_attr` parameter when your table is configured with a composite primary key _(hash+range key)_, as in a single-table design.
224+
225+
When enabled, the circuit name is saved in the sort key instead, and the partition key defaults to `circuit_breaker#{LAMBDA_FUNCTION_NAME}` — this **namespaces circuits per function** so two functions can use the same circuit name without sharing state. (Without `sort_key_attr`, the default remains partition-key-only with the circuit name as the partition key.)
226+
227+
Set `static_pk_value` to a fixed value (as below) when you instead want **multiple functions to share the same circuit** — for example, every function guarding the same downstream dependency should trip together.
228+
229+
```python hl_lines="12-14"
230+
--8<-- "examples/circuit_breaker_alpha/src/working_with_composite_primary_key.py"
231+
```
232+
233+
??? note "Click to expand and learn how table items would look like"
234+
235+
The circuit name is stored in the sort key, so several circuits share one partition. Attributes that don't apply to a state are simply absent.
236+
237+
| PK | SK | state | failure_count | opened_at | half_open_owner | probe_lease_expiry | expiration |
238+
| --------------- | --------------- | --------- | ------------- | ---------- | --------------- | ------------------ | ---------- |
239+
| CIRCUIT_BREAKER | payment-backend | OPEN | 5 | 1699999999 | | | 1700003599 |
240+
| CIRCUIT_BREAKER | inventory-api | HALF_OPEN | 5 | 1699999999 | 9f3c1a2b-env | 1700000030 | 1700003599 |
241+
| CIRCUIT_BREAKER | email-service | CLOSED | 0 | | | | |
242+
221243
## Testing your code
222244

223245
When unit testing the function a circuit protects, set `POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true` to bypass the circuit and persistence layer entirely, so your tests exercise the business logic without needing DynamoDB.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
3+
from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker
4+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
5+
CircuitBreakerDynamoDBPersistence,
6+
)
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
10+
persistence = CircuitBreakerDynamoDBPersistence(
11+
table_name=table,
12+
key_attr="PK",
13+
sort_key_attr="SK",
14+
static_pk_value="CIRCUIT_BREAKER",
15+
)
16+
17+
18+
class PaymentBackend:
19+
def charge(self, order: dict): ...
20+
21+
22+
payment_api = PaymentBackend()
23+
24+
25+
@circuit_breaker(name="payment-backend", persistence_store=persistence)
26+
def charge(order: dict) -> dict:
27+
return payment_api.charge(order) # the protected downstream call
28+
29+
30+
def lambda_handler(event: dict, context: LambdaContext):
31+
return charge(event)

tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
CircuitBreakerDynamoDBPersistence,
1111
)
1212
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
13+
from aws_lambda_powertools.warnings import PowertoolsUserWarning
1314

1415
TABLE_NAME = "CircuitBreakerState"
1516

@@ -268,3 +269,129 @@ def test_build_half_open_condition_with_expected_opened_at(persistence):
268269
assert "#opened_at = :expected_opened_at" in result["ConditionExpression"]
269270
assert result["ExpressionAttributeNames"]["#opened_at"] == "opened_at"
270271
assert result["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "5000"}
272+
273+
274+
# --------------------------------------------------------------------------- composite (single-table) key
275+
276+
277+
COMPOSITE_KEY = {"PK": {"S": "CIRCUIT_BREAKER"}, "SK": {"S": "payment"}}
278+
279+
280+
@pytest.fixture
281+
def composite_persistence():
282+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
283+
layer = CircuitBreakerDynamoDBPersistence(
284+
table_name=TABLE_NAME,
285+
boto3_client=client,
286+
key_attr="PK",
287+
sort_key_attr="SK",
288+
static_pk_value="CIRCUIT_BREAKER",
289+
)
290+
layer.configure(CircuitBreakerConfig(local_cache_max_age=0), circuit_name="payment")
291+
return layer
292+
293+
294+
def test_composite_get_state_reads_composite_key(composite_persistence):
295+
stubber = Stubber(composite_persistence.client)
296+
stubber.add_response(
297+
"get_item",
298+
{},
299+
{"TableName": TABLE_NAME, "Key": COMPOSITE_KEY, "ConsistentRead": False},
300+
)
301+
with stubber:
302+
record = composite_persistence.get_state("payment")
303+
assert record.state == CircuitState.CLOSED
304+
305+
306+
def test_composite_save_open_writes_composite_key(composite_persistence):
307+
captured, restore = _capture_put_item(composite_persistence)
308+
try:
309+
composite_persistence.save_open("payment", failure_count=5, opened_at=1000)
310+
finally:
311+
restore()
312+
313+
item = captured["Item"]
314+
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
315+
assert item["SK"] == {"S": "payment"}
316+
assert "id" not in item
317+
assert item["state"] == {"S": "OPEN"}
318+
319+
320+
def test_composite_try_acquire_half_open_writes_composite_key(composite_persistence):
321+
captured, restore = _capture_put_item(composite_persistence)
322+
try:
323+
assert composite_persistence.try_acquire_half_open("payment", "env-a", 1000) is True
324+
finally:
325+
restore()
326+
327+
item = captured["Item"]
328+
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
329+
assert item["SK"] == {"S": "payment"}
330+
assert item["state"] == {"S": "HALF_OPEN"}
331+
# The conditional election is still emitted in composite mode (condition is key-independent).
332+
assert "#state = :open AND attribute_not_exists(#half_open_owner)" in captured["ConditionExpression"]
333+
assert "#probe_lease_expiry <= :now" in captured["ConditionExpression"]
334+
335+
336+
def test_composite_try_acquire_half_open_loses_on_conditional_failure(composite_persistence):
337+
stubber = Stubber(composite_persistence.client)
338+
stubber.add_client_error("put_item", service_error_code="ConditionalCheckFailedException")
339+
with stubber:
340+
assert composite_persistence.try_acquire_half_open("payment", "env-b", 1000) is False
341+
342+
343+
def test_composite_save_closed_updates_composite_key(composite_persistence):
344+
captured = {}
345+
original_update = composite_persistence.client.update_item
346+
347+
def capturing_update(**kwargs):
348+
captured.update(kwargs)
349+
return {}
350+
351+
composite_persistence.client.update_item = capturing_update
352+
try:
353+
composite_persistence.save_closed("payment")
354+
finally:
355+
composite_persistence.client.update_item = original_update
356+
357+
assert captured["Key"] == COMPOSITE_KEY
358+
359+
360+
def test_composite_item_to_record_reads_name_from_sort_key(composite_persistence):
361+
item = {
362+
"PK": {"S": "CIRCUIT_BREAKER"},
363+
"SK": {"S": "payment"},
364+
"state": {"S": "OPEN"},
365+
"failure_count": {"N": "2"},
366+
}
367+
record = composite_persistence._item_to_record(item)
368+
assert record.name == "payment"
369+
assert record.state == CircuitState.OPEN
370+
371+
372+
def test_sort_key_equal_to_key_attr_raises():
373+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
374+
with pytest.raises(ValueError, match="cannot be the same"):
375+
CircuitBreakerDynamoDBPersistence(
376+
table_name=TABLE_NAME,
377+
boto3_client=client,
378+
key_attr="PK",
379+
sort_key_attr="PK",
380+
)
381+
382+
383+
def test_default_static_pk_value_namespaces_function_name(monkeypatch):
384+
monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "orders-fn")
385+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
386+
layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client, sort_key_attr="SK")
387+
assert layer.static_pk_value == "circuit_breaker#orders-fn"
388+
389+
390+
def test_static_pk_value_without_sort_key_attr_warns():
391+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
392+
with pytest.warns(PowertoolsUserWarning, match="static_pk_value is ignored unless sort_key_attr"):
393+
CircuitBreakerDynamoDBPersistence(
394+
table_name=TABLE_NAME,
395+
boto3_client=client,
396+
static_pk_value="CIRCUIT_BREAKER",
397+
)

0 commit comments

Comments
 (0)