Skip to content

feat(scheduler): add optional send_timeout to prevent silent stall on hung kick#627

Open
Rohit-Ekbote wants to merge 1 commit into
taskiq-python:masterfrom
Rohit-Ekbote:feat/scheduler-send-timeout
Open

feat(scheduler): add optional send_timeout to prevent silent stall on hung kick#627
Rohit-Ekbote wants to merge 1 commit into
taskiq-python:masterfrom
Rohit-Ekbote:feat/scheduler-send-timeout

Conversation

@Rohit-Ekbote
Copy link
Copy Markdown

Summary

Adds an optional send_timeout (CLI --send-timeout) on SchedulerLoop.run that wraps each spawned send task in asyncio.wait_for, so a hung broker.kick cannot indefinitely block subsequent ticks for the same schedule_id via the running_schedules skip check.

Default is None — no timeout, fully backwards-compatible.

Problem

SchedulerLoop.run skips ticks for any schedule_id whose previous send task is still in running_schedules. The entry only leaves running_schedules via the task's add_done_callback, which fires only when the spawned coroutine completes. If broker.kick ever hangs (sentinel failover, half-open socket, network blip without TCP RST), the spawned send task never completes, the done_callback never fires, and every subsequent tick for that schedule is silently skipped — no error, no log, no recovery until process restart.

We hit this in production on taskiq==0.11.20 (ListQueueSentinelBroker + redis-sentinel running through a flap). After upgrading to validate, the same logic is unchanged at HEAD; the bug is present in 0.12.4.

py-spy on a wedged scheduler confirmed:

  • the asyncio event loop is alive and iterating (_run_once timer advances every minute)
  • sched_count: 1 — exactly one timer, the main loop's own sleep
  • the scheduler simply stops spawning new send tasks for affected schedule_ids

User-level mitigations — subclassing TaskiqScheduler and wrapping on_ready in asyncio.wait_for — don't help, because the wrapper runs inside the send task; if the send task itself never returns, the scheduler's running_schedules entry persists regardless. The only place a timeout can break the cycle is around the spawned send task itself, which is owned by the scheduler.

Filed as #626 with the full investigation.

Change

  1. New free function send_with_timeout(scheduler, source, task, timeout) in taskiq/cli/scheduler/run.py that wraps send in asyncio.wait_for. On TimeoutError, logs a WARNING and returns cleanly so the done_callback can clean up running_schedules. Non-timeout exceptions still propagate so existing observability surfaces (e.g., SendTaskError logs) are unchanged.
  2. SchedulerLoop.run accepts send_timeout: float | None = None. When set, the spawned send task uses send_with_timeout; otherwise it uses send as before.
  3. SchedulerArgs.send_timeout: float | None and a --send-timeout CLI argument plumbed through to SchedulerLoop.run.

Defaults preserve current behavior. Users opt in by passing --send-timeout=N (or setting send_timeout programmatically).

Tests

New tests/cli/scheduler/test_send_with_timeout.py covering:

  • Timed-out send returns cleanly (does NOT raise) and logs a WARNING with task_name + schedule_id + timeout
  • Successful send does NOT log a warning
  • Non-timeout exceptions propagate
  • The inner send coroutine is actually cancelled on timeout (observed via CancelledError in the test scheduler's on_ready)
  • Plain send is unchanged in behavior (opt-in only)

All 5 new tests pass. All 26 existing tests/cli/scheduler/ tests pass — no regressions.

API shape

  • Function: taskiq.cli.scheduler.run.send_with_timeout (free function, mirrors send)
  • SchedulerLoop.run(*, send_timeout: float | None = None) (kw-only, defaults to None)
  • CLI: --send-timeout FLOAT (default: no timeout)

Happy to iterate on the name / default / placement if maintainers prefer a different shape. Reasonable production default is ~60s; defaulting to None keeps the change strictly additive.

… hung kick

## Summary

Adds an optional `send_timeout` (CLI `--send-timeout`) on `SchedulerLoop.run` that wraps each spawned send task in `asyncio.wait_for`, so a hung `broker.kick` cannot indefinitely block subsequent ticks for the same `schedule_id` via the `running_schedules` skip check.

Default is `None` — no timeout, fully backwards-compatible.

## Problem

`SchedulerLoop.run` skips ticks for any `schedule_id` whose previous send task is still in `running_schedules`. The entry only leaves `running_schedules` via the task's `add_done_callback`, which fires only when the spawned coroutine completes. If `broker.kick` ever hangs (sentinel failover, half-open socket, network blip without TCP RST), the spawned send task never completes, the done_callback never fires, and **every subsequent tick for that schedule is silently skipped** — no error, no log, no recovery until process restart.

We hit this in production on `taskiq==0.11.20` (`ListQueueSentinelBroker` + redis-sentinel running through a flap). After upgrading to validate, the same logic is unchanged at HEAD; the bug is present in `0.12.4`.

py-spy on a wedged scheduler confirmed:
- the asyncio event loop is alive and iterating (`_run_once` timer advances every minute)
- `sched_count: 1` — exactly one timer, the main loop's own sleep
- the scheduler simply stops spawning new send tasks for affected `schedule_id`s

User-level mitigations — subclassing `TaskiqScheduler` and wrapping `on_ready` in `asyncio.wait_for` — don't help, because the wrapper runs *inside* the send task; if the send task itself never returns, the scheduler's `running_schedules` entry persists regardless. The only place a timeout can break the cycle is around the spawned send task itself, which is owned by the scheduler.

Filed as #<ISSUE_NUMBER> with the full investigation.

## Change

1. New free function `send_with_timeout(scheduler, source, task, timeout)` in `taskiq/cli/scheduler/run.py` that wraps `send` in `asyncio.wait_for`. On `TimeoutError`, logs a WARNING and returns cleanly so the done_callback can clean up `running_schedules`. Non-timeout exceptions still propagate so existing observability surfaces (e.g., `SendTaskError` logs) are unchanged.
2. `SchedulerLoop.run` accepts `send_timeout: float | None = None`. When set, the spawned send task uses `send_with_timeout`; otherwise it uses `send` as before.
3. `SchedulerArgs.send_timeout: float | None` and a `--send-timeout` CLI argument plumbed through to `SchedulerLoop.run`.

Defaults preserve current behavior. Users opt in by passing `--send-timeout=N` (or setting `send_timeout` programmatically).

## Tests

New `tests/cli/scheduler/test_send_with_timeout.py` covering:
- Timed-out send returns cleanly (does NOT raise) and logs a WARNING with task_name + schedule_id + timeout
- Successful send does NOT log a warning
- Non-timeout exceptions propagate
- The inner `send` coroutine is actually cancelled on timeout (observed via `CancelledError` in the test scheduler's `on_ready`)
- Plain `send` is unchanged in behavior (opt-in only)

All 5 new tests pass. All 26 existing `tests/cli/scheduler/` tests pass — no regressions.

## API shape

- Function: `taskiq.cli.scheduler.run.send_with_timeout` (free function, mirrors `send`)
- `SchedulerLoop.run(*, send_timeout: float | None = None)` (kw-only, defaults to None)
- CLI: `--send-timeout FLOAT` (default: no timeout)

Happy to iterate on the name / default / placement if maintainers prefer a different shape. Reasonable production default is ~60s; defaulting to `None` keeps the change strictly additive.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant