Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@

import datetime
import logging
import os
import warnings
from typing import TYPE_CHECKING

import boto3
from boto3.dynamodb.types import TypeDeserializer
from botocore.exceptions import ClientError

from aws_lambda_powertools.shared import user_agent
from aws_lambda_powertools.shared import constants, user_agent
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
CircuitBreakerExistingLockError,
CircuitBreakerPersistenceLayer,
)
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
from aws_lambda_powertools.warnings import PowertoolsUserWarning

if TYPE_CHECKING:
from botocore.config import Config
Expand All @@ -41,6 +44,13 @@ class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer):
Name of the DynamoDB table that stores circuit state.
key_attr : str
Partition key attribute holding the circuit name. Defaults to ``"id"``.
static_pk_value : str, optional
Partition key value used when ``sort_key_attr`` is set, so the circuit name
moves to the sort key. Defaults to ``"circuit_breaker#<function-name>"``.
sort_key_attr : str, optional
Sort key attribute holding the circuit name. When set, the table is treated as
composite: ``static_pk_value`` is written to the partition key and the circuit
name to the sort key. Omit it for the default partition-key-only behavior.
state_attr : str
Attribute holding the circuit state. Defaults to ``"state"``.
failure_count_attr : str
Expand Down Expand Up @@ -75,6 +85,8 @@ def __init__(
self,
table_name: str,
key_attr: str = "id",
static_pk_value: str | None = None,
sort_key_attr: str | None = None,
state_attr: str = "state",
failure_count_attr: str = "failure_count",
opened_at_attr: str = "opened_at",
Expand All @@ -92,8 +104,23 @@ def __init__(

user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker")

if sort_key_attr == key_attr:
raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")

if static_pk_value is not None and sort_key_attr is None:
warnings.warn(
"static_pk_value is ignored unless sort_key_attr is also set.",
category=PowertoolsUserWarning,
stacklevel=2,
)

if static_pk_value is None:
static_pk_value = f"circuit_breaker#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"

self.table_name = table_name
self.key_attr = key_attr
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.state_attr = state_attr
self.failure_count_attr = failure_count_attr
self.opened_at_attr = opened_at_attr
Expand All @@ -111,7 +138,7 @@ def _item_to_record(self, item: dict) -> CircuitStateRecord:
opened_at = data.get(self.opened_at_attr)
probe_lease_expiry = data.get(self.probe_lease_expiry_attr)
return CircuitStateRecord(
name=data[self.key_attr],
name=data[self.sort_key_attr] if self.sort_key_attr else data[self.key_attr],
state=CircuitState(data[self.state_attr]),
failure_count=int(data.get(self.failure_count_attr, 0)),
opened_at=int(opened_at) if opened_at is not None else None,
Expand All @@ -130,10 +157,21 @@ def _s(value: str) -> dict:
"""Wrap a value as a DynamoDB string attribute."""
return {"S": value}

def _get_key(self, name: str) -> dict:
"""Build the simple or composite primary key for a circuit.

When ``sort_key_attr`` is set the key is composite: ``static_pk_value`` in the
partition and the circuit name in the sort key. Otherwise the name is the
partition key.
"""
if self.sort_key_attr:
return {self.key_attr: self._s(self.static_pk_value), self.sort_key_attr: self._s(name)}
return {self.key_attr: self._s(name)}

def _record_to_item(self, record: CircuitStateRecord) -> dict:
"""Translate a :class:`CircuitStateRecord` into a DynamoDB item."""
item: dict = {
self.key_attr: self._s(record.name),
**self._get_key(record.name),
self.state_attr: self._s(str(record.state)),
self.failure_count_attr: self._n(record.failure_count),
}
Expand All @@ -152,7 +190,7 @@ def _get_record(self, name: str) -> CircuitStateRecord | None:
# and halves the read cost on the hot path.
response = self.client.get_item(
TableName=self.table_name,
Key={self.key_attr: self._s(name)},
Key=self._get_key(name),
ConsistentRead=False,
)
item = response.get("Item")
Expand Down Expand Up @@ -242,7 +280,7 @@ def _build_update_query(self, record: CircuitStateRecord) -> dict:

return {
"TableName": self.table_name,
"Key": {self.key_attr: self._s(record.name)},
"Key": self._get_key(record.name),
"UpdateExpression": update_expression,
"ExpressionAttributeNames": expression_attr_names,
"ExpressionAttributeValues": expression_attr_values,
Expand Down
24 changes: 23 additions & 1 deletion docs/utilities/circuit_breaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Unless you're looking to [customize each attribute](#customizing-the-dynamodb-ta
| Partition key | `id` | Holds the circuit name |
| TTL attribute name | `expiration` | Using AWS Console? This is configurable after table creation |

You **can** use a single DynamoDB table for all your circuits.
You **can** use a single DynamoDB table for all your circuits. Already have a single-table (composite key) design you want to reuse? See [Using a composite primary key](#using-a-composite-primary-key).

##### DynamoDB IaC example

Expand Down Expand Up @@ -218,6 +218,28 @@ Set **`POWERTOOLS_CIRCUIT_BREAKER_DISABLED`**{: .copyMe} to a truthy value to by

`CircuitBreakerDynamoDBPersistence` accepts attribute-name overrides (`key_attr`, `state_attr`, `failure_count_attr`, `opened_at_attr`, `half_open_owner_attr`, `expiry_attr`) and the usual boto3 escape hatches (`boto3_session`, `boto3_client`, `boto_config`) for reusing an existing table layout or client.

#### Using a composite primary key

Use the `sort_key_attr` parameter when your table is configured with a composite primary key _(hash+range key)_, as in a single-table design.

When enabled, the circuit name is saved in the sort key instead, and the partition key defaults to `circuit_breaker#{LAMBDA_FUNCTION_NAME}` — this **namespaces circuits per function** so two functions can use the same circuit name without sharing state. (Without `sort_key_attr`, the default remains partition-key-only with the circuit name as the partition key.)

Set `static_pk_value` to a fixed value (as below) when you instead want **multiple functions to share the same circuit** — for example, every function guarding the same downstream dependency should trip together.

```python hl_lines="12-14"
--8<-- "examples/circuit_breaker_alpha/src/working_with_composite_primary_key.py"
```

??? note "Click to expand and learn how table items would look like"

The circuit name is stored in the sort key, so several circuits share one partition. Attributes that don't apply to a state are simply absent.

| PK | SK | state | failure_count | opened_at | half_open_owner | probe_lease_expiry | expiration |
| --------------- | --------------- | --------- | ------------- | ---------- | --------------- | ------------------ | ---------- |
| CIRCUIT_BREAKER | payment-backend | OPEN | 5 | 1699999999 | | | 1700003599 |
| CIRCUIT_BREAKER | inventory-api | HALF_OPEN | 5 | 1699999999 | 9f3c1a2b-env | 1700000030 | 1700003599 |
| CIRCUIT_BREAKER | email-service | CLOSED | 0 | | | | |

## Testing your code

When unit testing the function a circuit protects, set `POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true` to bypass the circuit and persistence layer entirely, so your tests exercise the business logic without needing DynamoDB.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os

from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
CircuitBreakerDynamoDBPersistence,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
persistence = CircuitBreakerDynamoDBPersistence(
table_name=table,
key_attr="PK",
sort_key_attr="SK",
static_pk_value="CIRCUIT_BREAKER",
)


class PaymentBackend:
def charge(self, order: dict): ...


payment_api = PaymentBackend()


@circuit_breaker(name="payment-backend", persistence_store=persistence)
def charge(order: dict) -> dict:
return payment_api.charge(order) # the protected downstream call


def lambda_handler(event: dict, context: LambdaContext):
return charge(event)
127 changes: 127 additions & 0 deletions tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CircuitBreakerDynamoDBPersistence,
)
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
from aws_lambda_powertools.warnings import PowertoolsUserWarning

TABLE_NAME = "CircuitBreakerState"

Expand Down Expand Up @@ -268,3 +269,129 @@ def test_build_half_open_condition_with_expected_opened_at(persistence):
assert "#opened_at = :expected_opened_at" in result["ConditionExpression"]
assert result["ExpressionAttributeNames"]["#opened_at"] == "opened_at"
assert result["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "5000"}


# --------------------------------------------------------------------------- composite (single-table) key


COMPOSITE_KEY = {"PK": {"S": "CIRCUIT_BREAKER"}, "SK": {"S": "payment"}}


@pytest.fixture
def composite_persistence():
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
layer = CircuitBreakerDynamoDBPersistence(
table_name=TABLE_NAME,
boto3_client=client,
key_attr="PK",
sort_key_attr="SK",
static_pk_value="CIRCUIT_BREAKER",
)
layer.configure(CircuitBreakerConfig(local_cache_max_age=0), circuit_name="payment")
return layer


def test_composite_get_state_reads_composite_key(composite_persistence):
stubber = Stubber(composite_persistence.client)
stubber.add_response(
"get_item",
{},
{"TableName": TABLE_NAME, "Key": COMPOSITE_KEY, "ConsistentRead": False},
)
with stubber:
record = composite_persistence.get_state("payment")
assert record.state == CircuitState.CLOSED


def test_composite_save_open_writes_composite_key(composite_persistence):
captured, restore = _capture_put_item(composite_persistence)
try:
composite_persistence.save_open("payment", failure_count=5, opened_at=1000)
finally:
restore()

item = captured["Item"]
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
assert item["SK"] == {"S": "payment"}
assert "id" not in item
assert item["state"] == {"S": "OPEN"}


def test_composite_try_acquire_half_open_writes_composite_key(composite_persistence):
captured, restore = _capture_put_item(composite_persistence)
try:
assert composite_persistence.try_acquire_half_open("payment", "env-a", 1000) is True
finally:
restore()

item = captured["Item"]
assert item["PK"] == {"S": "CIRCUIT_BREAKER"}
assert item["SK"] == {"S": "payment"}
assert item["state"] == {"S": "HALF_OPEN"}
# The conditional election is still emitted in composite mode (condition is key-independent).
assert "#state = :open AND attribute_not_exists(#half_open_owner)" in captured["ConditionExpression"]
assert "#probe_lease_expiry <= :now" in captured["ConditionExpression"]


def test_composite_try_acquire_half_open_loses_on_conditional_failure(composite_persistence):
stubber = Stubber(composite_persistence.client)
stubber.add_client_error("put_item", service_error_code="ConditionalCheckFailedException")
with stubber:
assert composite_persistence.try_acquire_half_open("payment", "env-b", 1000) is False


def test_composite_save_closed_updates_composite_key(composite_persistence):
captured = {}
original_update = composite_persistence.client.update_item

def capturing_update(**kwargs):
captured.update(kwargs)
return {}

composite_persistence.client.update_item = capturing_update
try:
composite_persistence.save_closed("payment")
finally:
composite_persistence.client.update_item = original_update

assert captured["Key"] == COMPOSITE_KEY


def test_composite_item_to_record_reads_name_from_sort_key(composite_persistence):
item = {
"PK": {"S": "CIRCUIT_BREAKER"},
"SK": {"S": "payment"},
"state": {"S": "OPEN"},
"failure_count": {"N": "2"},
}
record = composite_persistence._item_to_record(item)
assert record.name == "payment"
assert record.state == CircuitState.OPEN


def test_sort_key_equal_to_key_attr_raises():
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
with pytest.raises(ValueError, match="cannot be the same"):
CircuitBreakerDynamoDBPersistence(
table_name=TABLE_NAME,
boto3_client=client,
key_attr="PK",
sort_key_attr="PK",
)


def test_default_static_pk_value_namespaces_function_name(monkeypatch):
monkeypatch.setenv("AWS_LAMBDA_FUNCTION_NAME", "orders-fn")
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client, sort_key_attr="SK")
assert layer.static_pk_value == "circuit_breaker#orders-fn"


def test_static_pk_value_without_sort_key_attr_warns():
client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
with pytest.warns(PowertoolsUserWarning, match="static_pk_value is ignored unless sort_key_attr"):
CircuitBreakerDynamoDBPersistence(
table_name=TABLE_NAME,
boto3_client=client,
static_pk_value="CIRCUIT_BREAKER",
)