Skip to content

Commit cbc7c6f

Browse files
committed
feat(circuit_breaker): support composite primary key in DynamoDB persistence
Add optional `sort_key_attr` and `static_pk_value` parameters to CircuitBreakerDynamoDBPersistence so circuit state can be stored in a single-table (composite key) design, mirroring the Idempotency utility. When `sort_key_attr` is set, the circuit name is written to the sort key and `static_pk_value` to the partition key (default `circuit_breaker#<function-name>`); omit it for the current partition-key-only behavior. The composite key is threaded through the get, conditional put (half-open probe election) and update paths. Includes Stubber-based tests for both modes, docs, and an example. Closes #8315
1 parent f8c7faf commit cbc7c6f

4 files changed

Lines changed: 204 additions & 6 deletions

File tree

aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
import datetime
88
import logging
9+
import os
910
from typing import TYPE_CHECKING
1011

1112
import boto3
1213
from boto3.dynamodb.types import TypeDeserializer
1314
from botocore.exceptions import ClientError
1415

15-
from aws_lambda_powertools.shared import user_agent
16+
from aws_lambda_powertools.shared import constants, user_agent
1617
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
1718
CircuitBreakerExistingLockError,
1819
CircuitBreakerPersistenceLayer,
@@ -41,6 +42,13 @@ class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer):
4142
Name of the DynamoDB table that stores circuit state.
4243
key_attr : str
4344
Partition key attribute holding the circuit name. Defaults to ``"id"``.
45+
static_pk_value : str, optional
46+
Partition key value used when ``sort_key_attr`` is set, so the circuit name
47+
moves to the sort key. Defaults to ``"circuit_breaker#<function-name>"``.
48+
sort_key_attr : str, optional
49+
Sort key attribute holding the circuit name. When set, the table is treated as
50+
composite: ``static_pk_value`` is written to the partition key and the circuit
51+
name to the sort key. Omit it for the default partition-key-only behavior.
4452
state_attr : str
4553
Attribute holding the circuit state. Defaults to ``"state"``.
4654
failure_count_attr : str
@@ -75,6 +83,8 @@ def __init__(
7583
self,
7684
table_name: str,
7785
key_attr: str = "id",
86+
static_pk_value: str | None = None,
87+
sort_key_attr: str | None = None,
7888
state_attr: str = "state",
7989
failure_count_attr: str = "failure_count",
8090
opened_at_attr: str = "opened_at",
@@ -92,8 +102,16 @@ def __init__(
92102

93103
user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker")
94104

105+
if sort_key_attr == key_attr:
106+
raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")
107+
108+
if static_pk_value is None:
109+
static_pk_value = f"circuit_breaker#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"
110+
95111
self.table_name = table_name
96112
self.key_attr = key_attr
113+
self.static_pk_value = static_pk_value
114+
self.sort_key_attr = sort_key_attr
97115
self.state_attr = state_attr
98116
self.failure_count_attr = failure_count_attr
99117
self.opened_at_attr = opened_at_attr
@@ -111,7 +129,7 @@ def _item_to_record(self, item: dict) -> CircuitStateRecord:
111129
opened_at = data.get(self.opened_at_attr)
112130
probe_lease_expiry = data.get(self.probe_lease_expiry_attr)
113131
return CircuitStateRecord(
114-
name=data[self.key_attr],
132+
name=data[self.sort_key_attr] if self.sort_key_attr else data[self.key_attr],
115133
state=CircuitState(data[self.state_attr]),
116134
failure_count=int(data.get(self.failure_count_attr, 0)),
117135
opened_at=int(opened_at) if opened_at is not None else None,
@@ -130,10 +148,21 @@ def _s(value: str) -> dict:
130148
"""Wrap a value as a DynamoDB string attribute."""
131149
return {"S": value}
132150

151+
def _get_key(self, name: str) -> dict:
152+
"""Build the simple or composite primary key for a circuit.
153+
154+
When ``sort_key_attr`` is set the key is composite: ``static_pk_value`` in the
155+
partition and the circuit name in the sort key. Otherwise the name is the
156+
partition key.
157+
"""
158+
if self.sort_key_attr:
159+
return {self.key_attr: self._s(self.static_pk_value), self.sort_key_attr: self._s(name)}
160+
return {self.key_attr: self._s(name)}
161+
133162
def _record_to_item(self, record: CircuitStateRecord) -> dict:
134163
"""Translate a :class:`CircuitStateRecord` into a DynamoDB item."""
135164
item: dict = {
136-
self.key_attr: self._s(record.name),
165+
**self._get_key(record.name),
137166
self.state_attr: self._s(str(record.state)),
138167
self.failure_count_attr: self._n(record.failure_count),
139168
}
@@ -152,7 +181,7 @@ def _get_record(self, name: str) -> CircuitStateRecord | None:
152181
# and halves the read cost on the hot path.
153182
response = self.client.get_item(
154183
TableName=self.table_name,
155-
Key={self.key_attr: self._s(name)},
184+
Key=self._get_key(name),
156185
ConsistentRead=False,
157186
)
158187
item = response.get("Item")
@@ -242,7 +271,7 @@ def _build_update_query(self, record: CircuitStateRecord) -> dict:
242271

243272
return {
244273
"TableName": self.table_name,
245-
"Key": {self.key_attr: self._s(record.name)},
274+
"Key": self._get_key(record.name),
246275
"UpdateExpression": update_expression,
247276
"ExpressionAttributeNames": expression_attr_names,
248277
"ExpressionAttributeValues": expression_attr_values,

docs/utilities/circuit_breaker.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ Unless you're looking to [customize each attribute](#customizing-the-dynamodb-ta
9191
| Partition key | `id` | Holds the circuit name |
9292
| TTL attribute name | `expiration` | Using AWS Console? This is configurable after table creation |
9393

94-
You **can** use a single DynamoDB table for all your circuits.
94+
You **can** use a single DynamoDB table for all your circuits. Already have a single-table (composite key) design you want to reuse? See [Using a composite primary key](#using-a-composite-primary-key).
9595

9696
##### DynamoDB IaC example
9797

@@ -218,6 +218,28 @@ Set **`POWERTOOLS_CIRCUIT_BREAKER_DISABLED`**{: .copyMe} to a truthy value to by
218218

219219
`CircuitBreakerDynamoDBPersistence` accepts attribute-name overrides (`key_attr`, `state_attr`, `failure_count_attr`, `opened_at_attr`, `half_open_owner_attr`, `expiry_attr`) and the usual boto3 escape hatches (`boto3_session`, `boto3_client`, `boto_config`) for reusing an existing table layout or client.
220220

221+
#### Using a composite primary key
222+
223+
Use the `sort_key_attr` parameter when your table is configured with a composite primary key _(hash+range key)_, as in a single-table design.
224+
225+
When enabled, the circuit name is saved in the sort key instead, and the partition key defaults to `circuit_breaker#{LAMBDA_FUNCTION_NAME}` — this **namespaces circuits per function** so two functions can use the same circuit name without sharing state. (Without `sort_key_attr`, the default remains partition-key-only with the circuit name as the partition key.)
226+
227+
Set `static_pk_value` to a fixed value (as below) when you instead want **multiple functions to share the same circuit** — for example, every function guarding the same downstream dependency should trip together.
228+
229+
```python hl_lines="12-14"
230+
--8<-- "examples/circuit_breaker_alpha/src/working_with_composite_key.py"
231+
```
232+
233+
??? note "Click to expand and learn how table items would look like"
234+
235+
The circuit name is stored in the sort key, so several circuits share one partition. Attributes that don't apply to a state are simply absent.
236+
237+
| PK | SK | state | failure_count | opened_at | half_open_owner | probe_lease_expiry | expiration |
238+
| --------------- | --------------- | --------- | ------------- | ---------- | --------------- | ------------------ | ---------- |
239+
| CIRCUIT_BREAKER | payment-backend | OPEN | 5 | 1699999999 | | | 1700003599 |
240+
| CIRCUIT_BREAKER | inventory-api | HALF_OPEN | 5 | 1699999999 | 9f3c1a2b-env | 1700000030 | 1700003599 |
241+
| CIRCUIT_BREAKER | email-service | CLOSED | 0 | | | | |
242+
221243
## Testing your code
222244

223245
When unit testing the function a circuit protects, set `POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true` to bypass the circuit and persistence layer entirely, so your tests exercise the business logic without needing DynamoDB.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
3+
from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker
4+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
5+
CircuitBreakerDynamoDBPersistence,
6+
)
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
10+
persistence = CircuitBreakerDynamoDBPersistence(
11+
table_name=table,
12+
key_attr="PK",
13+
sort_key_attr="SK",
14+
static_pk_value="CIRCUIT_BREAKER",
15+
)
16+
17+
18+
class PaymentBackend:
19+
def charge(self, order: dict): ...
20+
21+
22+
payment_api = PaymentBackend()
23+
24+
25+
@circuit_breaker(name="payment-backend", persistence_store=persistence)
26+
def charge(order: dict) -> dict:
27+
return payment_api.charge(order) # the protected downstream call
28+
29+
30+
def lambda_handler(event: dict, context: LambdaContext):
31+
return charge(event)

tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,119 @@ def test_build_half_open_condition_with_expected_opened_at(persistence):
268268
assert "#opened_at = :expected_opened_at" in result["ConditionExpression"]
269269
assert result["ExpressionAttributeNames"]["#opened_at"] == "opened_at"
270270
assert result["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "5000"}
271+
272+
273+
# --------------------------------------------------------------------------- composite (single-table) key
274+
275+
276+
COMPOSITE_KEY = {"PK": {"S": "CIRCUIT_BREAKER"}, "SK": {"S": "payment"}}
277+
278+
279+
@pytest.fixture
280+
def composite_persistence():
281+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
282+
layer = CircuitBreakerDynamoDBPersistence(
283+
table_name=TABLE_NAME,
284+
boto3_client=client,
285+
key_attr="PK",
286+
sort_key_attr="SK",
287+
static_pk_value="CIRCUIT_BREAKER",
288+
)
289+
layer.configure(CircuitBreakerConfig(local_cache_max_age=0), circuit_name="payment")
290+
return layer
291+
292+
293+
def test_composite_get_state_reads_composite_key(composite_persistence):
294+
stubber = Stubber(composite_persistence.client)
295+
stubber.add_response(
296+
"get_item",
297+
{},
298+
{"TableName": TABLE_NAME, "Key": COMPOSITE_KEY, "ConsistentRead": False},
299+
)
300+
with stubber:
301+
record = composite_persistence.get_state("payment")
302+
assert record.state == CircuitState.CLOSED
303+
304+
305+
def test_composite_save_open_writes_composite_key(composite_persistence):
306+
captured, restore = _capture_put_item(composite_persistence)
307+
try:
308+
composite_persistence.save_open("payment", failure_count=5, opened_at=1000)
309+
finally:
310+
restore()
311+
312+
item = captured["Item"]
313+
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
314+
assert item["SK"] == {"S": "payment"}
315+
assert "id" not in item
316+
assert item["state"] == {"S": "OPEN"}
317+
318+
319+
def test_composite_try_acquire_half_open_writes_composite_key(composite_persistence):
320+
captured, restore = _capture_put_item(composite_persistence)
321+
try:
322+
assert composite_persistence.try_acquire_half_open("payment", "env-a", 1000) is True
323+
finally:
324+
restore()
325+
326+
item = captured["Item"]
327+
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
328+
assert item["SK"] == {"S": "payment"}
329+
assert item["state"] == {"S": "HALF_OPEN"}
330+
# The conditional election is still emitted in composite mode (condition is key-independent).
331+
assert "#state = :open AND attribute_not_exists(#half_open_owner)" in captured["ConditionExpression"]
332+
assert "#probe_lease_expiry <= :now" in captured["ConditionExpression"]
333+
334+
335+
def test_composite_try_acquire_half_open_loses_on_conditional_failure(composite_persistence):
336+
stubber = Stubber(composite_persistence.client)
337+
stubber.add_client_error("put_item", service_error_code="ConditionalCheckFailedException")
338+
with stubber:
339+
assert composite_persistence.try_acquire_half_open("payment", "env-b", 1000) is False
340+
341+
342+
def test_composite_save_closed_updates_composite_key(composite_persistence):
343+
captured = {}
344+
original_update = composite_persistence.client.update_item
345+
346+
def capturing_update(**kwargs):
347+
captured.update(kwargs)
348+
return {}
349+
350+
composite_persistence.client.update_item = capturing_update
351+
try:
352+
composite_persistence.save_closed("payment")
353+
finally:
354+
composite_persistence.client.update_item = original_update
355+
356+
assert captured["Key"] == COMPOSITE_KEY
357+
358+
359+
def test_composite_item_to_record_reads_name_from_sort_key(composite_persistence):
360+
item = {
361+
"PK": {"S": "CIRCUIT_BREAKER"},
362+
"SK": {"S": "payment"},
363+
"state": {"S": "OPEN"},
364+
"failure_count": {"N": "2"},
365+
}
366+
record = composite_persistence._item_to_record(item)
367+
assert record.name == "payment"
368+
assert record.state == CircuitState.OPEN
369+
370+
371+
def test_sort_key_equal_to_key_attr_raises():
372+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
373+
with pytest.raises(ValueError, match="cannot be the same"):
374+
CircuitBreakerDynamoDBPersistence(
375+
table_name=TABLE_NAME,
376+
boto3_client=client,
377+
key_attr="PK",
378+
sort_key_attr="PK",
379+
)
380+
381+
382+
def test_default_static_pk_value_namespaces_function_name(monkeypatch):
383+
monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "orders-fn")
384+
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
385+
layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client, sort_key_attr="SK")
386+
assert layer.static_pk_value == "circuit_breaker#orders-fn"

0 commit comments

Comments
 (0)