From 1e87b1612ae39e8ad8ae916f8a84500b2d48d81b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 30 Apr 2026 07:34:54 -0700 Subject: [PATCH 1/3] Drop unneeded self._current = None --- kafka/net/selector.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 870388c2d..1f8902a98 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -218,7 +218,6 @@ def sleep(self, delay): def _sleep(self, delay): self.call_later(delay, self._current) - self._current = None def wait_write(self, fileobj): return KernelEvent('_wait_write', fileobj) @@ -226,7 +225,6 @@ def wait_write(self, fileobj): def _wait_write(self, fileobj): self.register_event(fileobj, selectors.EVENT_WRITE, self._current) self._current.push_stack(lambda: self.unregister_event(fileobj, selectors.EVENT_WRITE)) - self._current = None def wait_read(self, fileobj): return KernelEvent('_wait_read', fileobj) @@ -234,7 +232,6 @@ def wait_read(self, fileobj): def _wait_read(self, fileobj): self.register_event(fileobj, selectors.EVENT_READ, self._current) self._current.push_stack(lambda: self.unregister_event(fileobj, selectors.EVENT_READ)) - self._current = None def _schedule_tasks(self): while self._scheduled and self._scheduled[0][0] <= time.monotonic(): From 2e22bce682b5aeea317a3c1d59fc1f36adcc151d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 30 Apr 2026 07:35:45 -0700 Subject: [PATCH 2/3] net.drain --- kafka/net/manager.py | 2 +- kafka/net/selector.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 13033d836..2ddc46dc9 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -365,7 +365,7 @@ def stop(self, timeout_ms=None): waiters with KafkaConnectionError. Idempotent.""" t = self._io_thread if t is None: - self._net.poll(drain=True) + self._net.drain() return self._io_thread = None self._net.stop() diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 1f8902a98..36ff71f3a 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -184,6 +184,10 @@ def run_until_done(self, task_or_future): self._poll_once() return task_or_future + def drain(self): + while self._ready: + self._poll_once() + def call_at(self, when, task): if not isinstance(task, Task): task = Task(task) From bd005c712c17032d1a921dbfc62bb7fff9a0080a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 30 Apr 2026 07:39:05 -0700 Subject: [PATCH 3/3] Move poll lock check to _poll_once --- kafka/net/selector.py | 149 +++++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 75 deletions(-) diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 36ff71f3a..def7c0f7b 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -290,9 +290,24 @@ def add_writer(self, fileobj, task): def remove_writer(self, fileobj): self.unregister_event(fileobj, selectors.EVENT_WRITE) - def poll(self, timeout_ms=None, future=None, drain=False): - if drain and future: - raise ValueError('Cannot set both drain and future') + def poll(self, timeout_ms=None, future=None): + log_trace('poll: enter') + start_at = time.monotonic() + inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None + if future is not None and future.is_done: + inner_timeout = 0 + while True: + self._poll_once(inner_timeout) + if future is None or future.is_done: + break + elif timeout_ms is not None: + inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) + if inner_timeout <= 0: + break + log_trace('poll: exit') + + def _poll_once(self, timeout=None): + log_trace('_poll_once: enter') if not self._poll_lock.acquire(blocking=False): # Lock contended. Distinguish recursive (this thread is already # in poll, e.g. via a task callback) from concurrent (a different @@ -303,84 +318,68 @@ def poll(self, timeout_ms=None, future=None, drain=False): raise RuntimeError('Concurrent access to net.poll!') self._poll_owner = threading.current_thread() try: - log_trace('poll: enter') - start_at = time.monotonic() - inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None - if future is not None and future.is_done: - inner_timeout = 0 - while (not drain) or (drain and self._ready): - self._poll_once(inner_timeout) - if future is None or future.is_done: - break - elif timeout_ms is not None: - inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) - if inner_timeout <= 0: - break - finally: - self._poll_owner = None - self._poll_lock.release() - log_trace('poll: exit') - - def _poll_once(self, timeout=None): - log_trace('_poll_once: enter') - if self._ready: - timeout = 0 - else: - scheduled_timeout = self._next_scheduled_timeout(time.monotonic()) - if scheduled_timeout is not None: - timeout = min(timeout, scheduled_timeout) if timeout is not None else scheduled_timeout - if timeout is not None: - if timeout > MAX_TIMEOUT: - timeout = MAX_TIMEOUT - elif timeout < 0: + if self._ready: + timeout = 0 + else: + scheduled_timeout = self._next_scheduled_timeout(time.monotonic()) + if scheduled_timeout is not None: + timeout = min(timeout, scheduled_timeout) if timeout is not None else scheduled_timeout + if timeout is not None: + if timeout > MAX_TIMEOUT: + timeout = MAX_TIMEOUT + elif timeout < 0: + timeout = 0 + elif not self._selector.get_map(): timeout = 0 - elif not self._selector.get_map(): - timeout = 0 - - ready_events = self._selector.select(timeout) - log_trace('_poll_once: %d ready_events', len(ready_events)) - self._process_events(ready_events) - self._schedule_tasks() - - threshold = self.config['slow_task_threshold_secs'] - n = len(self._ready) - for i in range(n): - self._current = self._ready.popleft() - step_start = time.monotonic() if threshold else None - try: - log_trace('Calling task %s', self._current) - event = self._current() - except StopIteration: - pass + ready_events = self._selector.select(timeout) + log_trace('_poll_once: %d ready_events', len(ready_events)) + self._process_events(ready_events) + self._schedule_tasks() - except BaseException as e: - log.exception(e) + threshold = self.config['slow_task_threshold_secs'] + n = len(self._ready) + for i in range(n): + self._current = self._ready.popleft() + step_start = time.monotonic() if threshold else None + try: + log_trace('Calling task %s', self._current) + event = self._current() + + except StopIteration: + pass + + except BaseException as e: + log.exception(e) - else: - if isinstance(event, KernelEvent): - log_trace('kernel event %s', event.method) - getattr(self, event.method)(*event.args) - elif isinstance(event, Future): - event.add_both(lambda _, task=self._current: self.call_soon(task)) else: - raise RuntimeError('Unhandled event type: %s' % event) - - if threshold: - elapsed = time.monotonic() - step_start - if elapsed > threshold: - msg = ( - 'Task %r ran for %.3fs (>%.3fs threshold). It is ' - 'blocking the event loop -- likely a tight sync loop ' - 'inside a coroutine. Other pollers will time out.' - % (self._current, elapsed, threshold)) - if self.config['raise_on_slow_task']: - self._current = None - raise RuntimeError(msg) - log.warning(msg) + if isinstance(event, KernelEvent): + log_trace('kernel event %s', event.method) + getattr(self, event.method)(*event.args) + elif isinstance(event, Future): + event.add_both(lambda _, task=self._current: self.call_soon(task)) + else: + raise RuntimeError('Unhandled event type: %s' % event) + + if threshold: + elapsed = time.monotonic() - step_start + if elapsed > threshold: + msg = ( + 'Task %r ran for %.3fs (>%.3fs threshold). It is ' + 'blocking the event loop -- likely a tight sync loop ' + 'inside a coroutine. Other pollers will time out.' + % (self._current, elapsed, threshold)) + if self.config['raise_on_slow_task']: + self._current = None + raise RuntimeError(msg) + log.warning(msg) + + self._current = None - self._current = None - log_trace('_poll_once: exit') + finally: + self._poll_owner = None + self._poll_lock.release() + log_trace('_poll_once: exit') def wakeup(self): try: