Skip to content

fix(TaskProducer): bounded queue of pending futures#678

Merged
bmckerry merged 1 commit into
mainfrom
ben/taskproducer-bounded-queue
Jun 5, 2026
Merged

fix(TaskProducer): bounded queue of pending futures#678
bmckerry merged 1 commit into
mainfrom
ben/taskproducer-bounded-queue

Conversation

@bmckerry
Copy link
Copy Markdown
Member

@bmckerry bmckerry commented Jun 5, 2026

Previously, TaskProducer added pending futures to a set with the assumption that collect_futures() would be routinely called. This isn't necessarily true (as a user could theoretically use TaskProducer outside of a task function), so this changes _pending_futures from a set to a deque with a maximum length.

@bmckerry bmckerry requested a review from a team as a code owner June 5, 2026 15:40
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit a615652. Configure here.


def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None:
_pending_futures.add(future)
_pending_futures.append(future)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evicted futures skip completion checks

High Severity

With _pending_futures capped at TASK_PRODUCER_MAX_PENDING_FUTURES, each append past the limit drops the oldest tracked future. A single activation that produces more than that many messages before collect_futures() runs can finish as success even though some Kafka produces were never included in pending_futures or awaited.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a615652. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick Claude search says we don't have any tasks in Sentry that produce more than ~1000 messages per task, so I'm going to assume 10,000 is a safe cap.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return an error, just in case someone does decide to produce >10k messages?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in slack, I opened a PR to record a metric for queue size that we can alert on instead.

Comment thread clients/python/src/taskbroker_client/worker/producer.py Outdated
Comment thread clients/python/src/taskbroker_client/worker/producer.py Outdated
@bmckerry bmckerry force-pushed the ben/taskproducer-bounded-queue branch from a615652 to f73bb20 Compare June 5, 2026 15:54
Comment thread clients/python/src/taskbroker_client/worker/producer.py
@bmckerry bmckerry force-pushed the ben/taskproducer-bounded-queue branch from f73bb20 to 40c18bb Compare June 5, 2026 16:00
@bmckerry bmckerry merged commit ce7adb0 into main Jun 5, 2026
27 checks passed
@bmckerry bmckerry deleted the ben/taskproducer-bounded-queue branch June 5, 2026 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants