From 742dfa375fb9ff18102aad6ce52d9eba5e67e41c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 16:35:18 -0700 Subject: [PATCH 1/2] fix typo (not sure why pylint missed this) --- kafka/net/selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 464e1db28..77716ea16 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -273,7 +273,7 @@ def remove_writer(self, fileobj): def poll(self, timeout_ms=None, future=None): if self._current: - raise RuntimError('Recursive access to net.poll!') + raise RuntimeError('Recursive access to net.poll!') elif self._running: raise RuntimeError('Concurrent access to net.poll!') log_trace('poll: enter') From 34c168624646f77072800bd00449b9fc46655349 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 29 Apr 2026 16:46:44 -0700 Subject: [PATCH 2/2] kafka.net.selector: use threading.Lock() to detect concurrent access to poll() --- kafka/net/selector.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 77716ea16..e4dbd4f4f 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -5,6 +5,7 @@ import heapq import selectors import socket +import threading import time from kafka.future import Future @@ -125,7 +126,7 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs[key] - self._running = False + self._lock = threading.Lock() self._closed = False self._stop = False self._selector = self.config['selector']() @@ -274,24 +275,25 @@ def remove_writer(self, fileobj): def poll(self, timeout_ms=None, future=None): if self._current: raise RuntimeError('Recursive access to net.poll!') - elif self._running: + elif not self._lock.acquire(blocking=False): raise RuntimeError('Concurrent access to net.poll!') - log_trace('poll: enter') - self._running = True - start_at = time.monotonic() - inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None - if future is not None and future.is_done: - inner_timeout = 0 - while True: - self._poll_once(inner_timeout) - if future is None or future.is_done: - break - elif timeout_ms is not None: - inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) - if inner_timeout <= 0: + try: + log_trace('poll: enter') + start_at = time.monotonic() + inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None + if future is not None and future.is_done: + inner_timeout = 0 + while True: + self._poll_once(inner_timeout) + if future is None or future.is_done: break - self._running = False - log_trace('poll: exit') + elif timeout_ms is not None: + inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) + if inner_timeout <= 0: + break + finally: + self._lock.release() + log_trace('poll: exit') def _poll_once(self, timeout=None): log_trace('_poll_once: enter')