diff --git a/kafka/net/selector.py b/kafka/net/selector.py index 464e1db28..e4dbd4f4f 100644 --- a/kafka/net/selector.py +++ b/kafka/net/selector.py @@ -5,6 +5,7 @@ import heapq import selectors import socket +import threading import time from kafka.future import Future @@ -125,7 +126,7 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs[key] - self._running = False + self._lock = threading.Lock() self._closed = False self._stop = False self._selector = self.config['selector']() @@ -273,25 +274,26 @@ def remove_writer(self, fileobj): def poll(self, timeout_ms=None, future=None): if self._current: - raise RuntimError('Recursive access to net.poll!') - elif self._running: + raise RuntimeError('Recursive access to net.poll!') + elif not self._lock.acquire(blocking=False): raise RuntimeError('Concurrent access to net.poll!') - log_trace('poll: enter') - self._running = True - start_at = time.monotonic() - inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None - if future is not None and future.is_done: - inner_timeout = 0 - while True: - self._poll_once(inner_timeout) - if future is None or future.is_done: - break - elif timeout_ms is not None: - inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) - if inner_timeout <= 0: + try: + log_trace('poll: enter') + start_at = time.monotonic() + inner_timeout = timeout_ms / 1000 if timeout_ms is not None else None + if future is not None and future.is_done: + inner_timeout = 0 + while True: + self._poll_once(inner_timeout) + if future is None or future.is_done: break - self._running = False - log_trace('poll: exit') + elif timeout_ms is not None: + inner_timeout = (timeout_ms / 1000) - (time.monotonic() - start_at) + if inner_timeout <= 0: + break + finally: + self._lock.release() + log_trace('poll: exit') def _poll_once(self, timeout=None): log_trace('_poll_once: enter')