Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions kafka/net/wakeup_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,19 @@ def __init__(self, net):
# next ``__call__``. All accesses run on the IO thread (notify
# routes through call_soon_threadsafe), so no lock is needed.
self._pending = False
# Coalescing guard: True once a ``_wakeup`` has been scheduled via
# ``notify()`` but has not yet run on the IO thread. Lets ``notify()``
# skip the redundant ``call_soon_threadsafe`` (Task alloc + socketpair
# write + selector wakeup) when a wake is already in flight. Set on
# user threads, cleared by ``_wakeup`` on the IO thread; cross-thread
# access is GIL-atomic and the check-then-set in ``notify()`` can at
# worst schedule one redundant wake, never drop one (see ``notify``).
self._scheduled = False

def _wakeup(self):
# Clear the coalescing guard first so a notify() racing with this
# callback schedules a fresh wake rather than being dropped.
self._scheduled = False
if self._fut is not None and not self._fut.is_done:
self._fut.success(None)
else:
Expand Down Expand Up @@ -57,13 +68,18 @@ async def __call__(self, timeout_secs=None):
self._net.cancel(timer)

def notify(self):
# Always queue _wakeup on the IO thread. Skipping the queue when
# ``self._fut is None`` would re-introduce the lost-wakeup race:
# the check could pass before another thread enters ``__call__``
# and creates the future. Routing through the IO thread is one
# call_soon_threadsafe (~microseconds) and lets ``_wakeup`` decide
# under single-threaded semantics whether to signal or latch.
# Coalesce: if a _wakeup is already scheduled and not yet consumed,
# skip. The state this notify() announces was mutated before this
# call, and the already-pending _wakeup runs (and drains) later, so it
# will observe that state -- no lost wakeup. We deliberately do NOT
# skip based on ``self._fut is None``. ``_scheduled`` is cleared by
# ``_wakeup`` the instant it runs, so a True value guarantees a wake
# is still pending. The check-then-set is GIL-atomic per access; a
# lost race between two threads only schedules one redundant wake.
if self._scheduled:
return
self._scheduled = True
try:
self._net.call_soon_threadsafe(self._wakeup)
except ReferenceError:
pass
self._scheduled = False
39 changes: 28 additions & 11 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,10 @@ def __init__(self, **configs):
metrics=self._metrics, metric_group_prefix='producer',
wakeup_timeout_ms=self.config['max_block_ms'],
**self.config)
manager = client._manager
self._client = client
self._manager = manager = client._manager
self._net = client._net
self._net.start()

# We currently depend on eager-resolution of api_version.
# If it wasn't provided as a config option, we need to bootstrap
Expand Down Expand Up @@ -629,7 +632,6 @@ def __init__(self, **configs):
transaction_manager=self._transaction_manager,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
self._sender.start()
self._closed = False

Expand Down Expand Up @@ -693,25 +695,40 @@ def __getattr__(self, name):

log.info("%s: Closing the Kafka producer with %s secs timeout.", str(self), timeout)
self.flush(timeout)
invoked_from_callback = bool(threading.current_thread() is self._sender)
on_io_thread = bool(self._net._io_thread is not None
and threading.current_thread() is self._net._io_thread)
if timeout > 0:
if invoked_from_callback:
if on_io_thread:
log.warning("%s: Overriding close timeout %s secs to 0 in order to"
" prevent useless blocking due to self-join. This"
" means you have incorrectly invoked close with a"
" non-zero timeout from the producer call-back.",
str(self), timeout)
else:
# Try to close gracefully.
if self._sender is not None:
self._sender.initiate_close()
self._sender.join(timeout)

if self._sender is not None and self._sender.is_alive():
elif self._sender is not None:
self._sender.initiate_close()
try:
self._manager.run(self._manager.wait_for,
self._sender._loop_future, timeout * 1000)
except Errors.KafkaTimeoutError:
pass

if self._sender is not None and self._sender.is_running():
log.info("%s: Proceeding to force close the producer since pending"
" requests could not be completed within timeout %s.",
str(self), timeout)
self._sender.force_close()
if not on_io_thread:
try:
self._manager.run(self._manager.wait_for,
self._sender._loop_future, self.config['retry_backoff_ms'])
except Errors.KafkaTimeoutError:
pass

if not on_io_thread:
try:
self._client.close()
except Exception:
log.exception("%s: Failed to close network client", str(self))

if self._metrics:
self._metrics.close()
Expand Down
Loading