Skip to content

Added deadlock detection in CommsDecoder#68377

Open
dabla wants to merge 7 commits into
apache:mainfrom
dabla:fix/deadlock-asend-comms-decoder
Open

Added deadlock detection in CommsDecoder#68377
dabla wants to merge 7 commits into
apache:mainfrom
dabla:fix/deadlock-asend-comms-decoder

Conversation

@dabla

@dabla dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds a two-level DeadlockImminentError guard in CommsDecoder.send() that catches the misuse
immediately and tells the developer exactly how to fix it.

Root cause

Async tasks that called BaseHook.get_hook() from inside async def / aexecute() were
inadvertently using the sync connection-fetch path:

# Inside an async @task — wrong
hook = KiotaRequestAdapterHook.get_hook(conn_id=MSGRAPH_CONN_ID)

get_hook() calls get_connection()Connection.get()_get_connection(), which
ultimately calls comms.send() — the blocking supervisor round-trip — on the event loop
thread
. Blocking the event loop thread while other coroutines are concurrently in-flight via
comms.asend() causes a deadlock or severe stall.

This was the reason the issue only appeared with MSGraph and not with SFTP: the SFTP example
used SFTPClientPool directly (a native async API), while the MSGraph task called the sync
get_hook() from inside an async function.

Changes

DeadlockImminentError + guard in CommsDecoder.send() (task-sdk/src/airflow/sdk/execution_time/comms.py)

Even with aget_hook() available, a developer could still mistakenly call the sync path from
an async context. Without a guard, the result is a silent hang that is almost impossible to
diagnose.

A new DeadlockImminentError (a BaseException subclass, so it is not swallowed by
contextlib.suppress(Exception)) is raised immediately by send() when it detects it is
running on the event loop thread.

The detection uses two levels:

Level Trigger What it means
1 — wrong pattern send() is called from the thread that last called asend() Deadlock will occur as soon as any concurrent coroutine calls asend()
2 — imminent deadlock Level 1 and _thread_lock is currently held by a thread-pool worker from a concurrent asend() round-trip Deadlock is happening right now

DeadlockImminentError inherits from BaseException rather than Exception for two reasons:

  1. contextlib.suppress(Exception) — used in mask_secret() and other helpers — does not
    catch it, so the error always surfaces.
  2. It signals a programmer error (wrong API used in wrong context), not a recoverable runtime
    failure, making BaseException semantically correct.

The error message includes the offending call stack and tells the developer exactly what to do:

comms.send() called from the event loop thread for message 'GetConnection'
— deadlock is imminent (asend() is concurrently in-flight).
Likely cause: BaseHook.get_hook() or BaseHook.get_connection() was called
from inside an async task.
Use the async equivalents instead:
await BaseHook.aget_hook() or await BaseHook.aget_connection().

Offending call stack:
  ...

Why this was hard to spot

MSGraph SFTP
Connection fetch get_hook()sync get_connection() SFTPClientPoolasync native API
Supervisor comms comms.send() blocks event loop thread comms.asend() offloads to thread pool
Result Deadlock / hang Works correctly

Testing

  • TestBaseHook.test_aget_hook — verifies aget_hook() routes exclusively through asend()
    and never calls the blocking send().
  • Tests for DeadlockImminentError — verifies send() raises the error (both levels) when
    called from the event loop thread, and that it propagates through contextlib.suppress(Exception).

Was generative AI tooling used to co-author this PR?
  • [ x ] Yes (please specify the tool below)

Claude Opus 4.6 to diagnose the issue and assist writing unit test


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@dabla dabla requested review from amoghrajesh, ashb and kaxil as code owners June 11, 2026 06:45
@dabla dabla added the affected_version:3.2 Use for reporting issues with 3.2 label Jun 11, 2026
@dabla dabla self-assigned this Jun 11, 2026
@dabla dabla added this to the Airflow 3.3.0 milestone Jun 11, 2026
@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from fd61c0e to 3ac9b72 Compare June 11, 2026 09:41
@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

I'm not sold on this fix. sync send on the event loop in an async should go via the existing greenlet trampoline etc to go back to the event loop, no?

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

[other coroutine] send() → tries _thread_lock.acquire()

That should never be allowed to happen. We need to detect why sync send is being run on the async loop and trampoline back to the async version instead.

@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

[other coroutine] send() → tries _thread_lock.acquire()

That should never be allowed to happen. We need to detect why sync send is being run on the async loop and trampoline back to the async version instead.

This is because of the mask_secret, which is sync, but the get_async_conn connection in msgraph is async, and it also logs some connection details (except password ofc), but the mask_secret is still applied anyway. We could add detection, but as mask_secret is sync, it would then crash anyway, but at least we would know why. That's why it was so difficult to track and why I didn't have the issue with SFTPAsyncHook

@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Claude propose to change mask_secret to something like this, but dunno if this is a good idea:

def mask_secret(secret: JsonValue, name: str | None = None) -> None:
    """
    Mask a secret in both task process and supervisor process.

    For secrets loaded from backends (Vault, env vars, etc.), this ensures
    they're masked in both the task subprocess AND supervisor's log output.
    Works safely in both sync and async contexts.

    When called from an async context (event loop running), the supervisor
    notification is scheduled as a fire-and-forget coroutine via
    :func:`asyncio.ensure_future` so the event loop is never blocked.
    """
    import asyncio
    from contextlib import suppress

    from airflow.sdk._shared.secrets_masker import _secrets_masker

    _secrets_masker().add_mask(secret, name)

    with suppress(Exception):
        # Try to tell supervisor (only if in task execution context)
        from airflow.sdk.execution_time import task_runner
        from airflow.sdk.execution_time.comms import MaskSecret

        if comms := getattr(task_runner, "SUPERVISOR_COMMS", None):
            msg = MaskSecret(value=secret, name=name)
            try:
                asyncio.get_running_loop()
                # We're on the event loop thread — must not call blocking send() here.
                # Schedule asend() as a fire-and-forget task; MaskSecret needs no response.
                asyncio.ensure_future(comms.asend(msg))
            except RuntimeError:
                # No running event loop — safe to call sync send().
                comms.send(msg)

This means detecting if mask_secret was called from an async context or not.

@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

Claude propose to change mask_secret to something like this, but dunno if this is a good idea:

def mask_secret(secret: JsonValue, name: str | None = None) -> None:
    """
    Mask a secret in both task process and supervisor process.

    For secrets loaded from backends (Vault, env vars, etc.), this ensures
    they're masked in both the task subprocess AND supervisor's log output.
    Works safely in both sync and async contexts.

    When called from an async context (event loop running), the supervisor
    notification is scheduled as a fire-and-forget coroutine via
    :func:`asyncio.ensure_future` so the event loop is never blocked.
    """
    import asyncio
    from contextlib import suppress

    from airflow.sdk._shared.secrets_masker import _secrets_masker

    _secrets_masker().add_mask(secret, name)

    with suppress(Exception):
        # Try to tell supervisor (only if in task execution context)
        from airflow.sdk.execution_time import task_runner
        from airflow.sdk.execution_time.comms import MaskSecret

        if comms := getattr(task_runner, "SUPERVISOR_COMMS", None):
            msg = MaskSecret(value=secret, name=name)
            try:
                asyncio.get_running_loop()
                # We're on the event loop thread — must not call blocking send() here.
                # Schedule asend() as a fire-and-forget task; MaskSecret needs no response.
                asyncio.ensure_future(comms.asend(msg))
            except RuntimeError:
                # No running event loop — safe to call sync send().
                comms.send(msg)

This means detecting if mask_secret was called from an async context or not.

Tested this solution and it doesn't solve the deadlock.

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

[other coroutine] send() → tries _thread_lock.acquire()
That should never be allowed to happen. We need to detect why sync send is being run on the async loop and trampoline back to the async version instead.

This is because of the mask_secret, which is sync, but the get_async_conn connection in msgraph is async, and it also logs some connection details (except password ofc), but the mask_secret is still applied anyway. We could add detection, but as mask_secret is sync, it would then crash anyway, but at least we would know why. That's why it was so difficult to track and why I didn't have the issue with SFTPAsyncHook

That doesn't make sense to me. mask_secret is a function which makes no network requests - it just applies masks based on what it is given.

Oh, to tell the supervisor to mask it too

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

Tested this solution and it doesn't solve the deadlock.

If that doesn't solve the deadlock then I'm not sure that the problem is identified correctly.

@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

I think I found the issue:

@task(show_return_value_in_logs=False, task_concurrency=5)
    async def get_user_registered_devices(user_id):
        from airflow.exceptions import AirflowNotFoundException
        from contextlib import suppress

        with suppress(AirflowNotFoundException):
            hook = KiotaRequestAdapterHook.get_hook(conn_id=MSGRAPH_CONN_ID)
            results = await hook.paginated_run(
                url=f"users/{user_id}/registeredDevices",
                query_parameters={"$select": "id"},
            )
            return results
        return []

We should implement an async get_hook in BaseHook, hence this is why it's causing deadlock, as the get_hook method underneath calls the sync get_connection method.

That was also the mean difference between my MSGraph case and SFTP one, hence why I didn't have the issue with SFTP example, because there I'm using the async SFTPClientPool:

       @task(
            retries=3,
            retry_delay=timedelta(seconds=60),
            show_return_value_in_logs=False,
        )
        async def load_xml_files(file, **context):
            import logging
            from io import BytesIO
            from os import cpu_count
            from airflow.providers.sftp.pools.sftp import SFTPClientPool

            connection = get_connection(sftp_conn)

            async with SFTPClientPool(
                sftp_conn_id=sftp_conn, pool_size=cpu_count()
            ).get_sftp_client() as sftp:
                logging.info("downloading: %s", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                # Convert and return rows for DB insert
                return convert_to_rows(
                    connection,
                    file,
                    convert_xml_to_json(
                        buffer,
                        xml_namespaces,
                        xml_encoding,
                        xml_force_list,
                    ),
                )

So it's good that you challenged the issue!

Still I think it's a good thing to add the async aget_hook method on BaseHook.

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

Oh greenback is currently only in the triggerer.

def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
if self._loop is None:
raise RuntimeError("start_reader() must be called before send()")
if threading.get_ident() == self._loop_thread_id:
# Called from the event loop thread itself (e.g. a trigger calling a sync SDK method
# directly from async def run()). run_coroutine_threadsafe(...).result() would deadlock
# here because .result() blocks the thread the event loop runs on.
# greenback.await_() teleports the coroutine back into the running loop instead.
if not greenback.has_portal():
raise RuntimeError(
"Sync SDK methods (e.g. get_connection(), get_variable()) cannot be called "
"from a trigger's async def run() when AIRFLOW_DISABLE_GREENBACK_PORTAL is "
"set. Either remove that environment variable, or use the async equivalent "
"(e.g. aget_connection(), aget_variable())."
)
return greenback.await_(self.asend(msg))
return asyncio.run_coroutine_threadsafe(self.asend(msg), self._loop).result()

I think the AsyncPython operator might need to do the same. Maybe. It's a bit invasive to set it there

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

@dabla Thanks for digging in more. I'm just very sensitive to adding or changing locks, as I've worked quite hard to fix deadlocks on trigger (by removing locks!)

@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

I think adding the aget_hook method on BaseHook is the only valid and solid solution.

@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

@dabla dabla changed the title Fix deadlock in CommsDecoder.asend() when sync send() runs concurrently on event loop Added aget_hook method on BaseHook to avoid deadlock in async tasks Jun 11, 2026
@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

Will try to add the check as well, good idea.

@dabla dabla changed the title Added aget_hook method on BaseHook to avoid deadlock in async tasks Added aget_hook method on BaseHook and deadlock detection Jun 11, 2026
@dabla

dabla commented Jun 11, 2026

Copy link
Copy Markdown
Contributor Author

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

Detection mechanism added

@dabla

dabla commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

[2026-06-12 07:56:11] ERROR - comms.DeadlockImminentError: comms.send() called from the event loop thread for message 'TaskState' — deadlock will occur as soon as another coroutine calls asend(). Likely cause: BaseHook.get_hook() or BaseHook.get_connection() was called from inside an async task. Use the async equivalents instead: await BaseHook.aget_hook() or await BaseHook.aget_connection().
[2026-06-12 07:56:11] ERROR - Offending call stack:
...

@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from f3a06a9 to 22f4ff2 Compare June 12, 2026 09:53
@dabla dabla changed the title Added aget_hook method on BaseHook and deadlock detection Added aget_hook method on BaseHook and deadlock detection in CommsDecoder Jun 12, 2026
@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from 22f4ff2 to bf229cb Compare June 12, 2026 13:58
@ashb

ashb commented Jun 12, 2026

Copy link
Copy Markdown
Member

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

[2026-06-12 07:56:11] ERROR - comms.DeadlockImminentError: comms.send() called from the event loop thread for message 'TaskState' — deadlock will occur as soon as another coroutine calls asend(). Likely cause: BaseHook.get_hook() or BaseHook.get_connection() was called from inside an async task. Use the async equivalents instead: await BaseHook.aget_hook() or await BaseHook.aget_connection().
[2026-06-12 07:56:11] ERROR - Offending call stack:
...

And there's basically 0 chance of this affecting anyone today, as this is only a surfaces when you use the AsyncPythonOperator, right? (Just thinking about if this will suddenly start failing loads of existing workloads that were working before due to dumb luck)

@dabla

dabla commented Jun 13, 2026

Copy link
Copy Markdown
Contributor Author

I wonder if we could also catch the deadlock in this case (_threading_lock.locked() and threading.get_ident() == self._loop_thread_id style) to throw a nice error message, pointing the user to update their hook?

[2026-06-12 07:56:11] ERROR - comms.DeadlockImminentError: comms.send() called from the event loop thread for message 'TaskState' — deadlock will occur as soon as another coroutine calls asend(). Likely cause: BaseHook.get_hook() or BaseHook.get_connection() was called from inside an async task. Use the async equivalents instead: await BaseHook.aget_hook() or await BaseHook.aget_connection().
[2026-06-12 07:56:11] ERROR - Offending call stack:
...

And there's basically 0 chance of this affecting anyone today, as this is only a surfaces when you use the AsyncPythonOperator, right? (Just thinking about if this will suddenly start failing loads of existing workloads that were working before due to dumb luck)

I did exactly the same reflection myself yesterday, but I monkey patched this check to all our environments (even production) to see what comes out of it and until now now exception where raised due to this check (+1200 DAG's), and we use a lot the async PythonOperator with iterate.

A possible solution would be to add a config parameter for this check which is disabled by default for example.

@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch 2 times, most recently from d2c27f1 to 2a0e4df Compare June 13, 2026 13:06
@dabla dabla changed the title Added aget_hook method on BaseHook and deadlock detection in CommsDecoder Added deadlock detection in CommsDecoder Jun 13, 2026
@dabla

dabla commented Jun 13, 2026

Copy link
Copy Markdown
Contributor Author

I've splitted the PR in two and create a separate PR to add the asynchronous aget_hook method to BaseHook.

@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from 2a0e4df to cbd969f Compare June 13, 2026 18:21
@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from cbd969f to ff9b930 Compare June 15, 2026 08:35
# Typical cause: BaseHook.get_hook() or BaseHook.get_connection() called
# from inside an async task / aexecute().
# Fix: use 'await BaseHook.aget_hook()' or 'await BaseHook.aget_connection()'.
_on_loop_thread = self._loop_thread_id is not None and threading.get_ident() == self._loop_thread_id

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_on_loop_thread = self._loop_thread_id is not None and threading.get_ident() == self._loop_thread_id
_on_loop_thread = threading.get_ident() == self._loop_thread_id

I think this is enough? get_ident() can never return None so if self._loop_thread_id is None the part after and would never be True anyway.

Actually I think the entire added block can be simplified to

on_loop_thread = False
if threading.get_ident() == self._loop_thread_id:
    with contextlib.suppress(RuntimeError):
        on_loop_thread = bool(asyncio.get_running_loop())

Comment on lines 284 to 328

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I missed anything; the two blocks seem to have mostly the same logic except the former uses blocking=False. I think we can just do

try:
    if not self._thread_lock.acquire(blocking=not on_loop_thread):
        raise DeadlockImminentError(...)
    ...
finally:
    self._thread_lock.release()

or, if this is too difficult to understand, with an inner function

def _communicate():
    ...

if on_loop_thread:
    try:
        if not acquire(blocking=False):
            raise
        _communicate()
    finally:
        release()
else:
    with lock:
        _communicate()

@dabla dabla force-pushed the fix/deadlock-asend-comms-decoder branch from 49e591c to 3bfd48f Compare June 17, 2026 15:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

affected_version:3.2 Use for reporting issues with 3.2 area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants