diff --git a/CHANGELOG.md b/CHANGELOG.md index 081a7c6..02beb7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,23 +1,39 @@ # Changelog -## 2.4.0 (Unreleased) +## 3.0.0 (Unreleased) -### Added +### Breaking Changes + +- **Simplified execution model** - Only two public execution modes: `worker` and `owngil` + - `worker`: Dedicated pthread per context with stable thread affinity (default) + - `owngil`: Dedicated pthread + subinterpreter with own GIL (Python 3.14+) + - Removed `multi_executor` and `free_threaded` from public API + - Internal capability detection still tracks Python features -- **Context thread affinity** - Contexts in MULTI_EXECUTOR mode are now assigned a - fixed executor thread at creation. All operations (call, eval, exec) from the same - context run on the same OS thread, preventing thread state corruption in libraries - like numpy and PyTorch that have thread-local state. +- **Removed `py:num_executors/0`** - Contexts now use per-context worker threads + instead of a shared executor pool. This function is no longer needed. + +- **`py:execution_mode/0` returns `worker | owngil`** - Based on the `context_mode` + application configuration. Previously returned internal capabilities like + `free_threaded`, `subinterp`, or `multi_executor`. ### Changed -- **`py:execution_mode/0` now returns actual mode** - Returns `worker` (default), - `owngil`, `free_threaded`, or `multi_executor` based on actual configuration - instead of Python capability. Previously returned `subinterp` even when using - worker mode. +- **Per-context worker threads** - Each context now gets its own dedicated pthread + that handles all Python operations. This provides stable thread affinity for + numpy/torch/tensorflow compatibility without needing a shared executor pool. + +- **Async NIF dispatch** - Context operations use async NIFs with message passing + instead of blocking dirty schedulers. This improves concurrency under load. + +- **Request queue per context** - Replaced single-slot request pattern with proper + request queues that support multiple concurrent callers. + +### Removed -- **Removed obsolete subinterp test references** - Test suites updated to reflect - the removal of subinterpreter mode. Tests now use `worker` or `owngil` modes. +- Multi-executor pool (`g_executors[]`, `multi_executor_start/stop`) +- `context_dispatch_call/eval/exec` functions (dead code) +- References to `PY_MODE_MULTI_EXECUTOR` in context operations ## 2.3.1 (2026-04-01) diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 93dd59b..a7e0612 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -1142,7 +1142,9 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg /* Save main thread state and release GIL for other threads */ g_main_thread_state = PyEval_SaveThread(); - /* Start executors based on execution mode */ + /* Start single executor for coordinator operations. + * Context operations use per-context worker threads (see worker_context_init). + * The single executor handles legacy worker API and coordinator tasks. */ int executor_result = 0; switch (g_execution_mode) { case PY_MODE_FREE_THREADED: @@ -1150,29 +1152,10 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg break; case PY_MODE_SUBINTERP: - /* Use single executor for coordinator operations */ - executor_result = executor_start(); - break; - case PY_MODE_MULTI_EXECUTOR: default: - /* Start multiple executors for GIL contention mode */ - { - int num_exec = MIN_EXECUTORS; /* Fallback if not provided */ - /* Check for config */ - if (argc > 0 && enif_is_map(env, argv[0])) { - ERL_NIF_TERM key = enif_make_atom(env, "num_executors"); - ERL_NIF_TERM value; - if (enif_get_map_value(env, argv[0], key, &value)) { - enif_get_int(env, value, &num_exec); - } - } - executor_result = multi_executor_start(num_exec); - if (executor_result < 0) { - /* Fallback to single executor */ - executor_result = executor_start(); - } - } + /* Use single executor for coordinator operations */ + executor_result = executor_start(); break; } @@ -1221,23 +1204,16 @@ static ERL_NIF_TERM nif_finalize(ErlNifEnv *env, int argc, const ERL_NIF_TERM ar * 3. Then clean up caches with GIL (no active work at this point) */ - /* Step 1: Stop executors - they will finish in-flight requests and exit */ + /* Step 1: Stop executor - it will finish in-flight requests and exit */ switch (g_execution_mode) { case PY_MODE_FREE_THREADED: /* No executor to stop */ break; case PY_MODE_SUBINTERP: - executor_stop(); - break; - case PY_MODE_MULTI_EXECUTOR: default: - if (atomic_load(&g_multi_executor_initialized)) { - multi_executor_stop(); - } else { - executor_stop(); - } + executor_stop(); break; } @@ -1771,13 +1747,6 @@ static ERL_NIF_TERM nif_execution_mode(ErlNifEnv *env, int argc, const ERL_NIF_T return enif_make_atom(env, mode_str); } -static ERL_NIF_TERM nif_num_executors(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { - (void)argc; - (void)argv; - - return enif_make_int(env, g_num_executors); -} - /* ============================================================================ * Callback support NIFs * ============================================================================ */ @@ -2236,7 +2205,116 @@ static PyObject *context_get_module(py_context_t *ctx, const char *module_name); * - Terms are passed via enif_make_copy() (zero serialization overhead) * ============================================================================ */ -#ifdef HAVE_SUBINTERPRETERS +/* ============================================================================ + * Context Request Queue Operations + * + * These functions manage the request queue for worker/owngil contexts. + * They replace the single-slot pattern that had race conditions. + * Available for all Python versions to support worker thread mode. + * ============================================================================ */ + +/** + * @brief Enqueue a request to a context's request queue + * + * Thread-safe. Adds request to tail of queue and signals worker. + * Caller must have already set refcount to 2 (caller + queue). + * + * @param ctx The context + * @param req The request (refcount should be 2) + */ +static void ctx_queue_enqueue(py_context_t *ctx, ctx_request_t *req) { + pthread_mutex_lock(&ctx->queue_mutex); + + req->next = NULL; + if (ctx->queue_tail == NULL) { + ctx->queue_head = req; + ctx->queue_tail = req; + } else { + ctx->queue_tail->next = req; + ctx->queue_tail = req; + } + + pthread_cond_signal(&ctx->queue_not_empty); + pthread_mutex_unlock(&ctx->queue_mutex); +} + +/** + * @brief Dequeue a request from a context's request queue + * + * Blocks until a request is available or shutdown is requested. + * Returns NULL if shutdown requested and queue is empty. + * + * @param ctx The context + * @return The dequeued request, or NULL on shutdown + */ +static ctx_request_t *ctx_queue_dequeue(py_context_t *ctx) { + pthread_mutex_lock(&ctx->queue_mutex); + + while (ctx->queue_head == NULL && !atomic_load(&ctx->shutdown_requested)) { + pthread_cond_wait(&ctx->queue_not_empty, &ctx->queue_mutex); + } + + ctx_request_t *req = ctx->queue_head; + if (req != NULL) { + ctx->queue_head = req->next; + if (ctx->queue_head == NULL) { + ctx->queue_tail = NULL; + } + req->next = NULL; + } + + pthread_mutex_unlock(&ctx->queue_mutex); + return req; +} + +/** + * @brief Cancel all pending requests in a context's queue + * + * Called during context destruction. Sets cancelled flag on all + * pending requests and signals their condition variables. + * + * @param ctx The context + */ +static void ctx_queue_cancel_all(py_context_t *ctx) { + pthread_mutex_lock(&ctx->queue_mutex); + + ctx_request_t *req = ctx->queue_head; + while (req != NULL) { + ctx_request_t *next = req->next; + atomic_store(&req->cancelled, true); + + /* Signal waiters that request is done (cancelled) */ + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + + /* Release queue's reference */ + ctx_request_release(req); + req = next; + } + + ctx->queue_head = NULL; + ctx->queue_tail = NULL; + + pthread_mutex_unlock(&ctx->queue_mutex); +} + +/* ============================================================================ + * Legacy execute functions (use context fields for compatibility) + * + * These functions read from ctx->shared_env/request_term and write to + * ctx->response_term/response_ok. The new queue-based approach populates + * these fields from the dequeued request for compatibility. + * + * TODO: Refactor these to take ctx_request_t* directly in a future phase. + * ============================================================================ */ + +/* Thread-local for current request being processed (for compatibility layer) */ +static __thread ErlNifEnv *tl_current_req_env = NULL; +static __thread ERL_NIF_TERM tl_current_req_data = 0; +static __thread ERL_NIF_TERM *tl_current_response = NULL; +static __thread bool *tl_current_response_ok = NULL; /** * @brief Execute a call request in the OWN_GIL thread @@ -2703,9 +2781,13 @@ static void owngil_execute_eval_with_env(py_context_t *ctx) { return; } - /* Set thread-local env for callback support */ + /* Set thread-local state for callback/suspension support */ + py_context_t *prev_context = tl_current_context; + tl_current_context = ctx; py_env_resource_t *prev_local_env = tl_current_local_env; tl_current_local_env = penv; + bool prev_allow_suspension = tl_allow_suspension; + tl_allow_suspension = true; /* Build eval_locals from penv->globals + any passed locals */ PyObject *eval_locals = PyDict_Copy(penv->globals); @@ -2723,6 +2805,8 @@ static void owngil_execute_eval_with_env(py_context_t *ctx) { if (compiled == NULL) { Py_DECREF(eval_locals); + tl_allow_suspension = prev_allow_suspension; + tl_current_context = prev_context; tl_current_local_env = prev_local_env; ctx->response_term = make_py_error(ctx->shared_env); ctx->response_ok = false; @@ -2733,11 +2817,158 @@ static void owngil_execute_eval_with_env(py_context_t *ctx) { Py_DECREF(compiled); Py_DECREF(eval_locals); - tl_current_local_env = prev_local_env; - if (py_result == NULL) { - ctx->response_term = make_py_error(ctx->shared_env); - ctx->response_ok = false; + /* Check for pending callback (suspension) */ + if (tl_pending_callback) { + PyErr_Clear(); + /* Create suspended state for callback handling */ + suspended_context_state_t *suspended = create_suspended_context_state_for_eval( + ctx->shared_env, ctx, &code_bin, tuple_terms[1]); + if (suspended == NULL) { + tl_pending_callback = false; + Py_CLEAR(tl_pending_args); + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "create_suspended_state_failed")); + ctx->response_ok = false; + } else { + ctx->response_term = build_suspended_context_result(ctx->shared_env, suspended); + ctx->response_ok = true; /* Suspended is a valid response */ + } + } else { + ctx->response_term = make_py_error(ctx->shared_env); + ctx->response_ok = false; + } + } else if (is_inline_schedule_marker(py_result)) { + /* Inline schedule marker: execute continuation directly in worker thread. + * Loop until we get a final result or a suspension. */ + int depth = 0; + while (is_inline_schedule_marker(py_result) && depth < MAX_INLINE_CONTINUATION_DEPTH) { + inline_continuation_t *cont = create_inline_continuation(ctx, penv, py_result, depth); + Py_DECREF(py_result); + py_result = NULL; + + if (cont == NULL) { + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "create_continuation_failed")); + ctx->response_ok = false; + goto cleanup; + } + + /* Execute the continuation function */ + PyObject *func = NULL; + PyObject *module = NULL; + + if (strcmp(cont->module_name, "__main__") == 0) { + /* Try captured globals first */ + if (cont->globals != NULL) { + func = PyDict_GetItemString(cont->globals, cont->func_name); + } + if (func == NULL && cont->locals != NULL) { + func = PyDict_GetItemString(cont->locals, cont->func_name); + } + if (func == NULL && penv != NULL) { + func = PyDict_GetItemString(penv->globals, cont->func_name); + } + if (func == NULL && ctx->globals != NULL) { + func = PyDict_GetItemString(ctx->globals, cont->func_name); + } + if (func != NULL) { + Py_INCREF(func); + } else { + PyErr_Format(PyExc_NameError, "name '%s' is not defined", cont->func_name); + } + } else { + module = PyImport_ImportModule(cont->module_name); + if (module != NULL) { + func = PyObject_GetAttrString(module, cont->func_name); + Py_DECREF(module); + } + } + + if (func == NULL) { + enif_release_resource(cont); + ctx->response_term = make_py_error(ctx->shared_env); + ctx->response_ok = false; + goto cleanup; + } + + /* Build args and call */ + PyObject *args = cont->args ? cont->args : PyTuple_New(0); + if (args == NULL) { + Py_DECREF(func); + enif_release_resource(cont); + ctx->response_term = make_py_error(ctx->shared_env); + ctx->response_ok = false; + goto cleanup; + } + if (cont->args) Py_INCREF(args); + + py_result = PyObject_Call(func, args, cont->kwargs); + Py_DECREF(func); + Py_DECREF(args); + enif_release_resource(cont); + depth++; + } + + if (depth >= MAX_INLINE_CONTINUATION_DEPTH) { + Py_XDECREF(py_result); + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "inline_continuation_depth_exceeded")); + ctx->response_ok = false; + goto cleanup; + } + + /* Handle final result (or error/suspension from continuation) */ + if (py_result == NULL) { + if (tl_pending_callback) { + PyErr_Clear(); + suspended_context_state_t *suspended = create_suspended_context_state_for_eval( + ctx->shared_env, ctx, &code_bin, tuple_terms[1]); + if (suspended == NULL) { + tl_pending_callback = false; + Py_CLEAR(tl_pending_args); + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "create_suspended_state_failed")); + ctx->response_ok = false; + } else { + ctx->response_term = build_suspended_context_result(ctx->shared_env, suspended); + ctx->response_ok = true; + } + } else { + ctx->response_term = make_py_error(ctx->shared_env); + ctx->response_ok = false; + } + } else if (is_schedule_marker(py_result)) { + ScheduleMarkerObject *marker = (ScheduleMarkerObject *)py_result; + ERL_NIF_TERM callback_name = py_to_term(ctx->shared_env, marker->callback_name); + ERL_NIF_TERM callback_args = py_to_term(ctx->shared_env, marker->args); + Py_DECREF(py_result); + ctx->response_term = enif_make_tuple3(ctx->shared_env, + enif_make_atom(ctx->shared_env, "schedule"), + callback_name, callback_args); + ctx->response_ok = true; + } else { + ERL_NIF_TERM term_result = py_to_term(ctx->shared_env, py_result); + Py_DECREF(py_result); + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "ok"), term_result); + ctx->response_ok = true; + } + goto cleanup; + } else if (is_schedule_marker(py_result)) { + /* Schedule marker: return {schedule, callback_name, args} */ + ScheduleMarkerObject *marker = (ScheduleMarkerObject *)py_result; + ERL_NIF_TERM callback_name = py_to_term(ctx->shared_env, marker->callback_name); + ERL_NIF_TERM callback_args = py_to_term(ctx->shared_env, marker->args); + Py_DECREF(py_result); + ctx->response_term = enif_make_tuple3(ctx->shared_env, + enif_make_atom(ctx->shared_env, "schedule"), + callback_name, callback_args); + ctx->response_ok = true; } else { ERL_NIF_TERM term_result = py_to_term(ctx->shared_env, py_result); Py_DECREF(py_result); @@ -2745,6 +2976,13 @@ static void owngil_execute_eval_with_env(py_context_t *ctx) { enif_make_atom(ctx->shared_env, "ok"), term_result); ctx->response_ok = true; } + +cleanup: + /* Restore thread-local state */ + tl_allow_suspension = prev_allow_suspension; + tl_current_context = prev_context; + tl_current_local_env = prev_local_env; + clear_pending_callback_tls(); } /** @@ -3156,128 +3394,679 @@ static void owngil_execute_request(py_context_t *ctx) { } } +/* ============================================================================ + * Worker Thread Implementation (main interpreter, all Python versions) + * + * Worker mode uses a dedicated pthread that acquires the GIL for each request. + * This provides stable thread affinity for numpy/torch/tensorflow without + * requiring subinterpreter support. + * ============================================================================ */ + /** - * @brief Main loop for OWN_GIL context thread + * @brief Main loop for worker context thread (main interpreter mode) * - * This function runs in a dedicated pthread. It creates an OWN_GIL subinterpreter, - * then enters a request loop where it processes requests from the dirty scheduler. + * This function runs in a dedicated pthread. It processes requests from the + * request queue, acquiring the GIL for each request using PyGILState_Ensure. + * + * Unlike owngil mode, worker mode uses the main interpreter and shares the GIL + * with other Python threads. The benefit is stable thread affinity and + * compatibility with all Python extensions. */ -static void *owngil_context_thread_main(void *arg) { +static void *worker_context_thread_main(void *arg) { py_context_t *ctx = (py_context_t *)arg; - /* Attach to Python runtime to create the subinterpreter. - * We need to hold the main GIL while creating the subinterpreter. */ + /* Create namespace dictionaries on the worker thread under GIL */ PyGILState_STATE gstate = PyGILState_Ensure(); - /* Create OWN_GIL subinterpreter */ - PyInterpreterConfig config = { - .use_main_obmalloc = 0, - .allow_fork = 0, - .allow_exec = 0, - .allow_threads = 1, - .allow_daemon_threads = 0, - .check_multi_interp_extensions = 1, - .gil = PyInterpreterConfig_OWN_GIL, - }; - - PyStatus status = Py_NewInterpreterFromConfig(&ctx->own_gil_tstate, &config); - if (PyStatus_IsError(status)) { - fprintf(stderr, "OWN_GIL: Py_NewInterpreterFromConfig failed: %s\n", - status.err_msg ? status.err_msg : "unknown error"); - PyGILState_Release(gstate); - atomic_store(&ctx->init_error, true); - return NULL; - } - - ctx->own_gil_interp = PyThreadState_GetInterpreter(ctx->own_gil_tstate); - - /* After Py_NewInterpreterFromConfig, we are now in the new interpreter's - * thread state and hold its GIL. The main interpreter's gstate is no longer - * relevant for this thread. */ - - /* Register erlang module in this subinterpreter */ - if (create_erlang_module() < 0) { - fprintf(stderr, "OWN_GIL: create_erlang_module failed\n"); - PyErr_Print(); - Py_EndInterpreter(ctx->own_gil_tstate); - atomic_store(&ctx->init_error, true); - return NULL; - } - - /* Register py_event_loop module for reactor support */ - if (create_py_event_loop_module() < 0) { - fprintf(stderr, "OWN_GIL: create_py_event_loop_module failed\n"); - PyErr_Print(); - Py_EndInterpreter(ctx->own_gil_tstate); - atomic_store(&ctx->init_error, true); - return NULL; - } - - /* Create namespace dictionaries */ - ctx->globals = PyDict_New(); - ctx->locals = PyDict_New(); - ctx->module_cache = PyDict_New(); + /* Create namespace dictionaries if not already created */ + if (ctx->globals == NULL) { + ctx->globals = PyDict_New(); + ctx->locals = PyDict_New(); + ctx->module_cache = PyDict_New(); - if (ctx->globals == NULL || ctx->locals == NULL || ctx->module_cache == NULL) { - fprintf(stderr, "OWN_GIL: PyDict_New failed for namespace dicts\n"); - Py_XDECREF(ctx->globals); - Py_XDECREF(ctx->locals); - Py_XDECREF(ctx->module_cache); - Py_EndInterpreter(ctx->own_gil_tstate); - /* Don't call PyGILState_Release - interpreter is gone */ - atomic_store(&ctx->init_error, true); - return NULL; - } + if (ctx->globals == NULL || ctx->locals == NULL || ctx->module_cache == NULL) { + PyGILState_Release(gstate); + atomic_store(&ctx->init_error, true); + atomic_store(&ctx->worker_running, false); + return NULL; + } - /* Import __builtins__ into globals */ - PyObject *builtins = PyEval_GetBuiltins(); - PyDict_SetItemString(ctx->globals, "__builtins__", builtins); + /* Import __builtins__ into globals */ + PyObject *builtins = PyEval_GetBuiltins(); + PyDict_SetItemString(ctx->globals, "__builtins__", builtins); - /* Import erlang module into globals */ - PyObject *erlang_module = PyImport_ImportModule("erlang"); - if (erlang_module != NULL) { - PyDict_SetItemString(ctx->globals, "erlang", erlang_module); - Py_DECREF(erlang_module); - } else { - /* Non-fatal - basic operations still work, but log for debugging */ - log_and_clear_python_error("OWN_GIL erlang module import"); + /* Import erlang module into globals */ + PyObject *erlang_module = PyImport_ImportModule("erlang"); + if (erlang_module != NULL) { + PyDict_SetItemString(ctx->globals, "erlang", erlang_module); + Py_DECREF(erlang_module); + } else { + log_and_clear_python_error("worker erlang module import"); + } } - /* Release our OWN_GIL (we'll reacquire when processing requests) */ - PyEval_SaveThread(); + PyGILState_Release(gstate); /* Signal that we're ready */ - atomic_store(&ctx->thread_running, true); - - /* Main request loop */ - pthread_mutex_lock(&ctx->request_mutex); + atomic_store(&ctx->worker_running, true); + /* Main request loop - uses queue instead of single-slot */ while (!atomic_load(&ctx->shutdown_requested)) { - /* Wait for a request */ - while (ctx->request_type == CTX_REQ_NONE && - !atomic_load(&ctx->shutdown_requested)) { - pthread_cond_wait(&ctx->request_ready, &ctx->request_mutex); + /* Dequeue next request (blocks until available or shutdown) */ + ctx_request_t *req = ctx_queue_dequeue(ctx); + + if (req == NULL) { + /* Queue empty and shutdown requested */ + break; } - if (atomic_load(&ctx->shutdown_requested)) { + if (req->type == CTX_REQ_SHUTDOWN) { + /* Shutdown sentinel - signal completion and exit */ + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + ctx_request_release(req); break; } - /* Release mutex while processing (allow concurrent dispatch attempts to queue) */ - pthread_mutex_unlock(&ctx->request_mutex); + /* Check if request was cancelled while queued */ + if (atomic_load(&req->cancelled)) { + /* Request cancelled - deliver error without processing */ + if (req->async_mode) { + /* Async mode: send cancellation message */ + enif_clear_env(ctx->msg_env); + ERL_NIF_TERM cancel_msg = enif_make_tuple3(ctx->msg_env, + enif_make_atom(ctx->msg_env, "py_result"), + enif_make_copy(ctx->msg_env, req->request_id), + enif_make_tuple2(ctx->msg_env, + enif_make_atom(ctx->msg_env, "error"), + enif_make_atom(ctx->msg_env, "cancelled"))); + enif_send(NULL, &req->caller_pid, ctx->msg_env, cancel_msg); + } else { + /* Blocking mode: signal condvar */ + req->result_env = enif_alloc_env(); + if (req->result_env) { + req->result = enif_make_tuple2(req->result_env, + enif_make_atom(req->result_env, "error"), + enif_make_atom(req->result_env, "cancelled")); + } + req->success = false; - /* Acquire our GIL and process */ - PyEval_RestoreThread(ctx->own_gil_tstate); - owngil_execute_request(ctx); - PyEval_SaveThread(); + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + } - /* Re-acquire mutex to signal completion and get next request */ - pthread_mutex_lock(&ctx->request_mutex); - ctx->request_type = CTX_REQ_NONE; - pthread_cond_signal(&ctx->response_ready); - } + ctx_request_release(req); + continue; + } - pthread_mutex_unlock(&ctx->request_mutex); + /* Populate legacy compatibility fields from request */ + ctx->shared_env = req->request_env; + ctx->request_type = req->type; + ctx->request_term = req->request_data; + ctx->reactor_buffer_ptr = req->reactor_buffer_ptr; + ctx->local_env_ptr = req->local_env_ptr; + ctx->response_ok = false; + ctx->response_term = 0; + + /* Acquire GIL and process the request */ + gstate = PyGILState_Ensure(); + owngil_execute_request(ctx); /* Reuse execute functions */ + PyGILState_Release(gstate); + + /* Copy response to request struct */ + req->result_env = enif_alloc_env(); + if (req->result_env && ctx->response_term != 0) { + req->result = enif_make_copy(req->result_env, ctx->response_term); + } else if (req->result_env) { + req->result = enif_make_tuple2(req->result_env, + enif_make_atom(req->result_env, "error"), + enif_make_atom(req->result_env, "no_response")); + } + req->success = ctx->response_ok; + + /* Clear legacy fields */ + ctx->shared_env = NULL; + ctx->request_type = CTX_REQ_NONE; + ctx->request_term = 0; + ctx->reactor_buffer_ptr = NULL; + ctx->local_env_ptr = NULL; + + /* Deliver result - async or blocking */ + if (req->async_mode) { + /* Async mode: send result message to caller */ + enif_clear_env(ctx->msg_env); + ERL_NIF_TERM result_msg = enif_make_tuple3(ctx->msg_env, + enif_make_atom(ctx->msg_env, "py_result"), + enif_make_copy(ctx->msg_env, req->request_id), + req->result_env ? enif_make_copy(ctx->msg_env, req->result) + : enif_make_tuple2(ctx->msg_env, + enif_make_atom(ctx->msg_env, "error"), + enif_make_atom(ctx->msg_env, "no_result"))); + enif_send(NULL, &req->caller_pid, ctx->msg_env, result_msg); + } else { + /* Blocking mode: signal condvar */ + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + } + + /* Release queue's reference to request */ + ctx_request_release(req); + } + + /* Cleanup: release namespace dictionaries under GIL */ + gstate = PyGILState_Ensure(); + Py_XDECREF(ctx->module_cache); + Py_XDECREF(ctx->globals); + Py_XDECREF(ctx->locals); + ctx->globals = NULL; + ctx->locals = NULL; + ctx->module_cache = NULL; + PyGILState_Release(gstate); + + atomic_store(&ctx->worker_running, false); + return NULL; +} + +/** + * @brief Initialize worker thread mode for a context + * + * @param ctx Context to initialize + * @return 0 on success, -1 on failure + */ +static int worker_context_init(py_context_t *ctx) { + ctx->uses_worker_thread = true; + + /* Initialize worker thread state */ + atomic_store(&ctx->worker_running, false); + atomic_store(&ctx->shutdown_requested, false); + atomic_store(&ctx->leaked, false); + + /* Initialize request queue */ + ctx->queue_head = NULL; + ctx->queue_tail = NULL; + + /* Initialize legacy compatibility fields */ + ctx->shared_env = NULL; + ctx->request_type = CTX_REQ_NONE; + ctx->request_term = 0; + ctx->response_term = 0; + ctx->response_ok = false; + ctx->local_env_ptr = NULL; + ctx->reactor_buffer_ptr = NULL; + + /* Initialize queue mutex */ + if (pthread_mutex_init(&ctx->queue_mutex, NULL) != 0) { + return -1; + } + + /* Initialize queue condition variable */ + if (pthread_cond_init(&ctx->queue_not_empty, NULL) != 0) { + pthread_mutex_destroy(&ctx->queue_mutex); + return -1; + } + + /* Create message environment for async responses */ + ctx->msg_env = enif_alloc_env(); + if (ctx->msg_env == NULL) { + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); + return -1; + } + + /* Globals/locals will be created by the worker thread */ + ctx->globals = NULL; + ctx->locals = NULL; + ctx->module_cache = NULL; + + /* Start the worker thread */ + if (pthread_create(&ctx->worker_thread, NULL, worker_context_thread_main, ctx) != 0) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); + return -1; + } + + /* Wait for thread to initialize or fail */ + int wait_count = 0; + while (!atomic_load(&ctx->worker_running) && + !atomic_load(&ctx->init_error) && + wait_count < 2000) { + usleep(1000); /* 1ms */ + wait_count++; + } + + if (atomic_load(&ctx->init_error) || !atomic_load(&ctx->worker_running)) { + /* Thread failed to start */ + pthread_join(ctx->worker_thread, NULL); + if (ctx->msg_env != NULL) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; + } + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); + return -1; + } + + return 0; +} + +/** + * @brief Shutdown worker thread mode and clean up resources + * + * Uses the join-or-leak pattern: if the worker thread doesn't respond + * within the timeout, we mark the context as leaked and do NOT free + * shared resources to avoid use-after-free. + * + * @param ctx Context to shutdown + */ +#define WORKER_SHUTDOWN_TIMEOUT_SECS 30 + +static void worker_context_shutdown(py_context_t *ctx) { + if (!ctx->uses_worker_thread) { + return; + } + + /* Signal shutdown */ + atomic_store(&ctx->shutdown_requested, true); + + /* Cancel all pending (not-yet-started) requests */ + ctx_queue_cancel_all(ctx); + + /* Enqueue shutdown request to wake worker if idle */ + ctx_request_t *shutdown_req = ctx_request_create(); + if (shutdown_req != NULL) { + shutdown_req->type = CTX_REQ_SHUTDOWN; + ctx_queue_enqueue(ctx, shutdown_req); + } + + /* Wait for thread to exit with timeout */ + bool join_succeeded = false; + +#if defined(__linux__) + struct timespec deadline; + clock_gettime(CLOCK_REALTIME, &deadline); + deadline.tv_sec += WORKER_SHUTDOWN_TIMEOUT_SECS; + int rc = pthread_timedjoin_np(ctx->worker_thread, NULL, &deadline); + join_succeeded = (rc == 0); +#else + /* macOS/other: poll worker_running flag with timeout */ + int wait_ms = 0; + while (atomic_load(&ctx->worker_running) && + wait_ms < WORKER_SHUTDOWN_TIMEOUT_SECS * 1000) { + usleep(100000); /* 100ms */ + wait_ms += 100; + } + if (!atomic_load(&ctx->worker_running)) { + pthread_join(ctx->worker_thread, NULL); + join_succeeded = true; + } +#endif + + if (!join_succeeded) { + /* Worker thread is unresponsive - use leak pattern */ + fprintf(stderr, "Worker thread shutdown timeout after %d seconds, leaking context\n", + WORKER_SHUTDOWN_TIMEOUT_SECS); + atomic_store(&ctx->leaked, true); + return; + } + + /* Clean shutdown succeeded - safe to free resources */ + if (ctx->msg_env != NULL) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; + } + + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); + + ctx->uses_worker_thread = false; +} + +/** + * @brief Dispatch a request to the worker thread and wait for response + * + * Uses the queue-based pattern: creates a request, enqueues it, waits for + * completion, and copies the result back to the caller's environment. + * + * @param env Caller's NIF environment + * @param ctx Context with worker thread + * @param req_type Request type (CTX_REQ_CALL, CTX_REQ_EVAL, CTX_REQ_EXEC, etc.) + * @param request_data Request data term + * @return Result term copied back to caller's env + */ +#define WORKER_DISPATCH_TIMEOUT_SECS 30 + +/** + * @brief Dispatch a request to the worker thread with optional local environment + * + * @param env NIF environment + * @param ctx Context to dispatch to + * @param req_type Request type + * @param request_data Request data term + * @param local_env Optional local environment (NULL for default) + * @return Result term + */ +static ERL_NIF_TERM dispatch_to_worker_thread_impl( + ErlNifEnv *env, + py_context_t *ctx, + ctx_request_type_t req_type, + ERL_NIF_TERM request_data, + void *local_env +) { + if (!atomic_load(&ctx->worker_running)) { + return make_error(env, "thread_not_running"); + } + + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = req_type; + req->request_data = enif_make_copy(req->request_env, request_data); + req->local_env_ptr = local_env; + + /* Add extra reference for queue (caller holds 1, queue holds 1) */ + ctx_request_addref(req); + ctx_queue_enqueue(ctx, req); + + /* Wait for completion with timeout */ + struct timespec deadline; + clock_gettime(CLOCK_REALTIME, &deadline); + deadline.tv_sec += WORKER_DISPATCH_TIMEOUT_SECS; + + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); + if (rc == ETIMEDOUT) { + /* Timeout - mark as cancelled and return error */ + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + ctx_request_release(req); + return make_error(env, "worker_timeout"); + } + } + + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's environment */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + /* Release caller's reference */ + ctx_request_release(req); + + return result; +} + +/** + * @brief Convenience wrapper for dispatch without local environment + */ +static ERL_NIF_TERM dispatch_to_worker_thread( + ErlNifEnv *env, + py_context_t *ctx, + ctx_request_type_t req_type, + ERL_NIF_TERM request_data +) { + return dispatch_to_worker_thread_impl(env, ctx, req_type, request_data, NULL); +} + +/** + * @brief Async dispatch to worker thread (non-blocking) + * + * Enqueues the request and returns immediately. The worker thread will + * send a {py_result, RequestId, Result} message to the caller when done. + * + * @param env NIF environment + * @param ctx Context + * @param req_type Request type + * @param request_data Request data term + * @param caller_pid Caller's PID for result delivery + * @param request_id Request ID for correlation + * @param local_env Optional local environment (NULL for default) + * @return {enqueued, RequestId} on success, {error, Reason} on failure + */ +static ERL_NIF_TERM dispatch_to_worker_thread_async( + ErlNifEnv *env, + py_context_t *ctx, + ctx_request_type_t req_type, + ERL_NIF_TERM request_data, + ErlNifPid caller_pid, + ERL_NIF_TERM request_id, + void *local_env +) { + if (!atomic_load(&ctx->worker_running)) { + return make_error(env, "thread_not_running"); + } + + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = req_type; + req->request_data = enif_make_copy(req->request_env, request_data); + req->local_env_ptr = local_env; + + /* Set async mode */ + req->async_mode = true; + req->caller_pid = caller_pid; + req->request_id = enif_make_copy(req->request_env, request_id); + + /* Add to queue (queue owns one reference, no caller reference needed) */ + ctx_queue_enqueue(ctx, req); + + /* Return immediately - no blocking! */ + return enif_make_tuple2(env, + enif_make_atom(env, "enqueued"), + request_id); +} + +#ifdef HAVE_SUBINTERPRETERS +/** + * @brief Main loop for OWN_GIL context thread + * + * This function runs in a dedicated pthread. It creates an OWN_GIL subinterpreter, + * then enters a request loop where it processes requests from the request queue. + * + * The queue-based pattern replaces the old single-slot pattern which had race + * conditions when multiple callers dispatched concurrently. + */ +static void *owngil_context_thread_main(void *arg) { + py_context_t *ctx = (py_context_t *)arg; + + /* Attach to Python runtime to create the subinterpreter. + * We need to hold the main GIL while creating the subinterpreter. */ + PyGILState_STATE gstate = PyGILState_Ensure(); + + /* Create OWN_GIL subinterpreter */ + PyInterpreterConfig config = { + .use_main_obmalloc = 0, + .allow_fork = 0, + .allow_exec = 0, + .allow_threads = 1, + .allow_daemon_threads = 0, + .check_multi_interp_extensions = 1, + .gil = PyInterpreterConfig_OWN_GIL, + }; + + PyStatus status = Py_NewInterpreterFromConfig(&ctx->own_gil_tstate, &config); + if (PyStatus_IsError(status)) { + fprintf(stderr, "OWN_GIL: Py_NewInterpreterFromConfig failed: %s\n", + status.err_msg ? status.err_msg : "unknown error"); + PyGILState_Release(gstate); + atomic_store(&ctx->init_error, true); + atomic_store(&ctx->worker_running, false); + return NULL; + } + + ctx->own_gil_interp = PyThreadState_GetInterpreter(ctx->own_gil_tstate); + + /* After Py_NewInterpreterFromConfig, we are now in the new interpreter's + * thread state and hold its GIL. The main interpreter's gstate is no longer + * relevant for this thread. */ + + /* Register erlang module in this subinterpreter */ + if (create_erlang_module() < 0) { + fprintf(stderr, "OWN_GIL: create_erlang_module failed\n"); + PyErr_Print(); + Py_EndInterpreter(ctx->own_gil_tstate); + atomic_store(&ctx->init_error, true); + atomic_store(&ctx->worker_running, false); + return NULL; + } + + /* Register py_event_loop module for reactor support */ + if (create_py_event_loop_module() < 0) { + fprintf(stderr, "OWN_GIL: create_py_event_loop_module failed\n"); + PyErr_Print(); + Py_EndInterpreter(ctx->own_gil_tstate); + atomic_store(&ctx->init_error, true); + atomic_store(&ctx->worker_running, false); + return NULL; + } + + /* Create namespace dictionaries */ + ctx->globals = PyDict_New(); + ctx->locals = PyDict_New(); + ctx->module_cache = PyDict_New(); + + if (ctx->globals == NULL || ctx->locals == NULL || ctx->module_cache == NULL) { + fprintf(stderr, "OWN_GIL: PyDict_New failed for namespace dicts\n"); + Py_XDECREF(ctx->globals); + Py_XDECREF(ctx->locals); + Py_XDECREF(ctx->module_cache); + Py_EndInterpreter(ctx->own_gil_tstate); + atomic_store(&ctx->init_error, true); + atomic_store(&ctx->worker_running, false); + return NULL; + } + + /* Import __builtins__ into globals */ + PyObject *builtins = PyEval_GetBuiltins(); + PyDict_SetItemString(ctx->globals, "__builtins__", builtins); + + /* Import erlang module into globals */ + PyObject *erlang_module = PyImport_ImportModule("erlang"); + if (erlang_module != NULL) { + PyDict_SetItemString(ctx->globals, "erlang", erlang_module); + Py_DECREF(erlang_module); + } else { + /* Non-fatal - basic operations still work, but log for debugging */ + log_and_clear_python_error("OWN_GIL erlang module import"); + } + + /* Release our OWN_GIL (we'll reacquire when processing requests) */ + PyEval_SaveThread(); + + /* Signal that we're ready */ + atomic_store(&ctx->worker_running, true); + + /* Main request loop - uses queue instead of single-slot */ + while (!atomic_load(&ctx->shutdown_requested)) { + /* Dequeue next request (blocks until available or shutdown) */ + ctx_request_t *req = ctx_queue_dequeue(ctx); + + if (req == NULL) { + /* Queue empty and shutdown requested */ + break; + } + + if (req->type == CTX_REQ_SHUTDOWN) { + /* Shutdown sentinel - signal completion and exit */ + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + ctx_request_release(req); + break; + } + + /* Check if request was cancelled while queued */ + if (atomic_load(&req->cancelled)) { + /* Request cancelled - signal completion without processing */ + req->result_env = enif_alloc_env(); + if (req->result_env) { + req->result = enif_make_tuple2(req->result_env, + enif_make_atom(req->result_env, "error"), + enif_make_atom(req->result_env, "cancelled")); + } + req->success = false; + + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + + ctx_request_release(req); + continue; + } + + /* Populate legacy compatibility fields from request */ + ctx->shared_env = req->request_env; + ctx->request_type = req->type; + ctx->request_term = req->request_data; + ctx->reactor_buffer_ptr = req->reactor_buffer_ptr; + ctx->local_env_ptr = req->local_env_ptr; + ctx->response_ok = false; + ctx->response_term = 0; + + /* Acquire our GIL and process the request */ + PyEval_RestoreThread(ctx->own_gil_tstate); + owngil_execute_request(ctx); + PyEval_SaveThread(); + + /* Copy response to request struct */ + req->result_env = enif_alloc_env(); + if (req->result_env && ctx->response_term != 0) { + req->result = enif_make_copy(req->result_env, ctx->response_term); + } else if (req->result_env) { + req->result = enif_make_tuple2(req->result_env, + enif_make_atom(req->result_env, "error"), + enif_make_atom(req->result_env, "no_response")); + } + req->success = ctx->response_ok; + + /* Clear legacy fields */ + ctx->shared_env = NULL; + ctx->request_type = CTX_REQ_NONE; + ctx->request_term = 0; + ctx->reactor_buffer_ptr = NULL; + ctx->local_env_ptr = NULL; + + /* Signal completion */ + pthread_mutex_lock(&req->mutex); + atomic_store(&req->completed, true); + pthread_cond_signal(&req->cond); + pthread_mutex_unlock(&req->mutex); + + /* Release queue's reference to request */ + ctx_request_release(req); + } /* Cleanup: acquire our OWN_GIL and destroy interpreter */ PyEval_RestoreThread(ctx->own_gil_tstate); @@ -3297,7 +4086,7 @@ static void *owngil_context_thread_main(void *arg) { * After Py_NewInterpreterFromConfig switched us to the OWN_GIL interpreter, * the original gstate is no longer valid. Py_EndInterpreter handles cleanup. */ - atomic_store(&ctx->thread_running, false); + atomic_store(&ctx->worker_running, false); return NULL; } @@ -3308,17 +4097,17 @@ static void *owngil_context_thread_main(void *arg) { #define OWNGIL_DISPATCH_TIMEOUT_SECS 30 /** - * @brief Dispatch a request to the OWN_GIL thread and wait for response + * @brief Dispatch a request to the worker thread and wait for response * - * Called from dirty schedulers. Copies the request term to the shared env, - * signals the worker thread, and waits for the response. + * Uses the queue-based pattern: creates a request, enqueues it, waits for + * completion, and copies the result back to the caller's environment. * - * Uses pthread_cond_timedwait to prevent indefinite blocking if the worker - * thread dies or becomes unresponsive. + * This replaces the old single-slot pattern which had race conditions when + * multiple callers dispatched concurrently. * * @param env Caller's NIF environment - * @param ctx Context with OWN_GIL - * @param req_type Request type (CTX_REQ_CALL, CTX_REQ_EVAL, CTX_REQ_EXEC) + * @param ctx Context with worker thread + * @param req_type Request type (CTX_REQ_CALL, CTX_REQ_EVAL, CTX_REQ_EXEC, etc.) * @param request_data Request data term * @return Result term copied back to caller's env */ @@ -3328,41 +4117,66 @@ static ERL_NIF_TERM dispatch_to_owngil_thread( ctx_request_type_t req_type, ERL_NIF_TERM request_data ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = req_type; + req->request_data = enif_make_copy(req->request_env, request_data); - /* Copy request to shared env (zero serialization overhead) */ - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_copy(ctx->shared_env, request_data); - ctx->request_type = req_type; + /* Add ref for queue (now refcount = 2: caller + queue) */ + ctx_request_addref(req); - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout to prevent deadlock on worker death */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - /* Worker thread is unresponsive - mark it as not running */ - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL dispatch timeout: worker thread unresponsive after %d seconds\n", + /* Worker thread is unresponsive - mark request as cancelled */ + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + /* Don't mark worker as dead - it might still be processing + * a long-running Python operation. Just fail this request. */ + fprintf(stderr, "OWN_GIL dispatch timeout after %d seconds\n", OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); /* Release caller's ref */ return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); - pthread_mutex_unlock(&ctx->request_mutex); + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + /* Release caller's ref */ + ctx_request_release(req); return result; } @@ -3370,52 +4184,74 @@ static ERL_NIF_TERM dispatch_to_owngil_thread( /** * @brief Dispatch reactor on_read_ready to OWN_GIL thread * - * Similar to dispatch_to_owngil_thread but also passes buffer pointer. - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ ERL_NIF_TERM dispatch_reactor_read_to_owngil(ErlNifEnv *env, py_context_t *ctx, int fd, void *buffer_ptr) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { enif_release_resource(buffer_ptr); return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + enif_release_resource(buffer_ptr); + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + enif_release_resource(buffer_ptr); + return make_error(env, "alloc_failed"); + } - /* Clear and set up request */ - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_int(ctx->shared_env, fd); - ctx->reactor_buffer_ptr = buffer_ptr; /* Transfer ownership */ - ctx->request_type = CTX_REQ_REACTOR_ON_READ_READY; + /* Populate request */ + req->type = CTX_REQ_REACTOR_ON_READ_READY; + req->request_data = enif_make_int(req->request_env, fd); + req->reactor_buffer_ptr = buffer_ptr; /* Transfer ownership */ + req->reactor_fd = fd; - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Add ref for queue (now refcount = 2: caller + queue) */ + ctx_request_addref(req); - /* Wait for response with timeout to prevent deadlock */ + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); + + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - /* Worker thread is unresponsive - clean up buffer and mark dead */ - atomic_store(&ctx->thread_running, false); - /* Buffer ownership was transferred but never processed - release it */ - if (ctx->reactor_buffer_ptr) { - enif_release_resource(ctx->reactor_buffer_ptr); - ctx->reactor_buffer_ptr = NULL; - } - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL reactor dispatch timeout: worker thread unresponsive\n"); + /* Request timeout - mark as cancelled but don't release buffer + * (worker will handle it when it gets to this request) */ + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL reactor dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); /* Release caller's ref */ return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } - pthread_mutex_unlock(&ctx->request_mutex); + /* Release caller's ref */ + ctx_request_release(req); return result; } @@ -3423,43 +4259,67 @@ ERL_NIF_TERM dispatch_reactor_read_to_owngil(ErlNifEnv *env, py_context_t *ctx, /** * @brief Dispatch reactor on_write_ready to OWN_GIL thread * - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ ERL_NIF_TERM dispatch_reactor_write_to_owngil(ErlNifEnv *env, py_context_t *ctx, int fd) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } - /* Clear and set up request */ - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_int(ctx->shared_env, fd); - ctx->request_type = CTX_REQ_REACTOR_ON_WRITE_READY; + /* Populate request */ + req->type = CTX_REQ_REACTOR_ON_WRITE_READY; + req->request_data = enif_make_int(req->request_env, fd); + req->reactor_fd = fd; - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Add ref for queue (now refcount = 2: caller + queue) */ + ctx_request_addref(req); - /* Wait for response with timeout to prevent deadlock */ + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); + + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL reactor write dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL reactor write dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); - pthread_mutex_unlock(&ctx->request_mutex); + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + ctx_request_release(req); return result; } @@ -3467,45 +4327,69 @@ ERL_NIF_TERM dispatch_reactor_write_to_owngil(ErlNifEnv *env, py_context_t *ctx, /** * @brief Dispatch reactor init_connection to OWN_GIL thread * - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ ERL_NIF_TERM dispatch_reactor_init_to_owngil(ErlNifEnv *env, py_context_t *ctx, int fd, ERL_NIF_TERM client_info) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = CTX_REQ_REACTOR_INIT_CONNECTION; + ERL_NIF_TERM fd_term = enif_make_int(req->request_env, fd); + ERL_NIF_TERM info_copy = enif_make_copy(req->request_env, client_info); + req->request_data = enif_make_tuple2(req->request_env, fd_term, info_copy); + req->reactor_fd = fd; - /* Clear and set up request */ - enif_clear_env(ctx->shared_env); - ERL_NIF_TERM fd_term = enif_make_int(ctx->shared_env, fd); - ERL_NIF_TERM info_copy = enif_make_copy(ctx->shared_env, client_info); - ctx->request_term = enif_make_tuple2(ctx->shared_env, fd_term, info_copy); - ctx->request_type = CTX_REQ_REACTOR_INIT_CONNECTION; + /* Add ref for queue (now refcount = 2: caller + queue) */ + ctx_request_addref(req); - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout to prevent deadlock */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL reactor init dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL reactor init dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } - pthread_mutex_unlock(&ctx->request_mutex); + ctx_request_release(req); return result; } @@ -3513,47 +4397,69 @@ ERL_NIF_TERM dispatch_reactor_init_to_owngil(ErlNifEnv *env, py_context_t *ctx, /** * @brief Dispatch exec_with_env to OWN_GIL thread * - * Passes the process-local env resource to the worker thread via local_env_ptr. - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_exec_with_env_to_owngil( ErlNifEnv *env, py_context_t *ctx, ERL_NIF_TERM code, py_env_resource_t *penv ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = CTX_REQ_EXEC_WITH_ENV; + req->request_data = enif_make_copy(req->request_env, code); + req->local_env_ptr = penv; - /* Copy request to shared env */ - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_copy(ctx->shared_env, code); - ctx->local_env_ptr = penv; /* Pass env resource pointer */ - ctx->request_type = CTX_REQ_EXEC_WITH_ENV; + /* Add ref for queue */ + ctx_request_addref(req); - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout to prevent deadlock */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL exec_with_env dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL exec_with_env dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } - pthread_mutex_unlock(&ctx->request_mutex); + ctx_request_release(req); return result; } @@ -3561,50 +4467,72 @@ static ERL_NIF_TERM dispatch_exec_with_env_to_owngil( /** * @brief Dispatch eval_with_env to OWN_GIL thread * - * Passes the process-local env resource to the worker thread via local_env_ptr. - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_eval_with_env_to_owngil( ErlNifEnv *env, py_context_t *ctx, ERL_NIF_TERM code, ERL_NIF_TERM locals, py_env_resource_t *penv ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request: {Code, Locals} */ + req->type = CTX_REQ_EVAL_WITH_ENV; + ERL_NIF_TERM code_copy = enif_make_copy(req->request_env, code); + ERL_NIF_TERM locals_copy = enif_make_copy(req->request_env, locals); + req->request_data = enif_make_tuple2(req->request_env, code_copy, locals_copy); + req->local_env_ptr = penv; - /* Copy request to shared env: {Code, Locals} */ - enif_clear_env(ctx->shared_env); - ERL_NIF_TERM code_copy = enif_make_copy(ctx->shared_env, code); - ERL_NIF_TERM locals_copy = enif_make_copy(ctx->shared_env, locals); - ctx->request_term = enif_make_tuple2(ctx->shared_env, code_copy, locals_copy); - ctx->local_env_ptr = penv; /* Pass env resource pointer */ - ctx->request_type = CTX_REQ_EVAL_WITH_ENV; + /* Add ref for queue */ + ctx_request_addref(req); - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout to prevent deadlock */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); - if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL eval_with_env dispatch timeout: worker thread unresponsive\n"); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); + if (rc == ETIMEDOUT) { + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL eval_with_env dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } - pthread_mutex_unlock(&ctx->request_mutex); + ctx_request_release(req); return result; } @@ -3612,8 +4540,7 @@ static ERL_NIF_TERM dispatch_eval_with_env_to_owngil( /** * @brief Dispatch call_with_env to OWN_GIL thread * - * Passes the process-local env resource to the worker thread via local_env_ptr. - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_call_with_env_to_owngil( ErlNifEnv *env, py_context_t *ctx, @@ -3621,45 +4548,68 @@ static ERL_NIF_TERM dispatch_call_with_env_to_owngil( ERL_NIF_TERM args, ERL_NIF_TERM kwargs, py_env_resource_t *penv ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } - /* Copy request to shared env: {Module, Func, Args, Kwargs} */ - enif_clear_env(ctx->shared_env); - ERL_NIF_TERM module_copy = enif_make_copy(ctx->shared_env, module); - ERL_NIF_TERM func_copy = enif_make_copy(ctx->shared_env, func); - ERL_NIF_TERM args_copy = enif_make_copy(ctx->shared_env, args); - ERL_NIF_TERM kwargs_copy = enif_make_copy(ctx->shared_env, kwargs); - ctx->request_term = enif_make_tuple4(ctx->shared_env, + /* Populate request: {Module, Func, Args, Kwargs} */ + req->type = CTX_REQ_CALL_WITH_ENV; + ERL_NIF_TERM module_copy = enif_make_copy(req->request_env, module); + ERL_NIF_TERM func_copy = enif_make_copy(req->request_env, func); + ERL_NIF_TERM args_copy = enif_make_copy(req->request_env, args); + ERL_NIF_TERM kwargs_copy = enif_make_copy(req->request_env, kwargs); + req->request_data = enif_make_tuple4(req->request_env, module_copy, func_copy, args_copy, kwargs_copy); - ctx->local_env_ptr = penv; /* Pass env resource pointer */ - ctx->request_type = CTX_REQ_CALL_WITH_ENV; + req->local_env_ptr = penv; - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Add ref for queue */ + ctx_request_addref(req); - /* Wait for response with timeout to prevent deadlock */ + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); + + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL call_with_env dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL call_with_env dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); - pthread_mutex_unlock(&ctx->request_mutex); + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + ctx_request_release(req); return result; } @@ -3667,47 +4617,68 @@ static ERL_NIF_TERM dispatch_call_with_env_to_owngil( /** * @brief Dispatch create_local_env to OWN_GIL thread * - * Creates the globals/locals dicts in the correct interpreter context. - * Returns ok or error. - * Uses timeout to prevent deadlock if worker thread dies. + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_create_local_env_to_owngil( ErlNifEnv *env, py_context_t *ctx, py_env_resource_t *res ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = CTX_REQ_CREATE_LOCAL_ENV; + req->local_env_ptr = res; - /* Pass env resource pointer to worker thread */ - enif_clear_env(ctx->shared_env); - ctx->local_env_ptr = res; - ctx->request_type = CTX_REQ_CREATE_LOCAL_ENV; + /* Add ref for queue */ + ctx_request_addref(req); - /* Signal the worker thread */ - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout to prevent deadlock */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL create_local_env dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL create_local_env dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - /* Copy response back to caller's env */ - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } - pthread_mutex_unlock(&ctx->request_mutex); + ctx_request_release(req); return result; } @@ -3715,43 +4686,67 @@ static ERL_NIF_TERM dispatch_create_local_env_to_owngil( /** * @brief Dispatch apply_imports to OWN_GIL worker thread * - * @param env NIF environment - * @param ctx Context resource - * @param imports_term List of {ModuleBin, FuncBin | all} tuples - * @return ok | {error, Reason} + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_apply_imports_to_owngil( ErlNifEnv *env, py_context_t *ctx, ERL_NIF_TERM imports_term ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } + + /* Populate request */ + req->type = CTX_REQ_APPLY_IMPORTS; + req->request_data = enif_make_copy(req->request_env, imports_term); - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_copy(ctx->shared_env, imports_term); - ctx->request_type = CTX_REQ_APPLY_IMPORTS; + /* Add ref for queue */ + ctx_request_addref(req); - pthread_cond_signal(&ctx->request_ready); + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); - /* Wait for response with timeout */ + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL apply_imports dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL apply_imports dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); - pthread_mutex_unlock(&ctx->request_mutex); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + ctx_request_release(req); return result; } @@ -3759,43 +4754,67 @@ static ERL_NIF_TERM dispatch_apply_imports_to_owngil( /** * @brief Dispatch apply_paths request to OWN_GIL worker thread * - * @param env Current NIF environment - * @param ctx OWN_GIL context - * @param paths_term List of path binaries - * @return ok | {error, Reason} + * Uses queue-based dispatch with per-request synchronization. */ static ERL_NIF_TERM dispatch_apply_paths_to_owngil( ErlNifEnv *env, py_context_t *ctx, ERL_NIF_TERM paths_term ) { - if (!atomic_load(&ctx->thread_running)) { + if (!atomic_load(&ctx->worker_running)) { return make_error(env, "thread_not_running"); } - pthread_mutex_lock(&ctx->request_mutex); + if (atomic_load(&ctx->destroyed)) { + return make_error(env, "context_destroyed"); + } + + /* Create request struct */ + ctx_request_t *req = ctx_request_create(); + if (req == NULL) { + return make_error(env, "alloc_failed"); + } - enif_clear_env(ctx->shared_env); - ctx->request_term = enif_make_copy(ctx->shared_env, paths_term); - ctx->request_type = CTX_REQ_APPLY_PATHS; + /* Populate request */ + req->type = CTX_REQ_APPLY_PATHS; + req->request_data = enif_make_copy(req->request_env, paths_term); - pthread_cond_signal(&ctx->request_ready); + /* Add ref for queue */ + ctx_request_addref(req); - /* Wait for response with timeout */ + /* Enqueue the request */ + ctx_queue_enqueue(ctx, req); + + /* Wait for completion with timeout */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_DISPATCH_TIMEOUT_SECS; - while (ctx->request_type != CTX_REQ_NONE) { - int rc = pthread_cond_timedwait(&ctx->response_ready, &ctx->request_mutex, &deadline); + ERL_NIF_TERM result; + pthread_mutex_lock(&req->mutex); + + while (!atomic_load(&req->completed)) { + int rc = pthread_cond_timedwait(&req->cond, &req->mutex, &deadline); if (rc == ETIMEDOUT) { - atomic_store(&ctx->thread_running, false); - pthread_mutex_unlock(&ctx->request_mutex); - fprintf(stderr, "OWN_GIL apply_paths dispatch timeout: worker thread unresponsive\n"); + atomic_store(&req->cancelled, true); + pthread_mutex_unlock(&req->mutex); + + fprintf(stderr, "OWN_GIL apply_paths dispatch timeout after %d seconds\n", + OWNGIL_DISPATCH_TIMEOUT_SECS); + + ctx_request_release(req); return make_error(env, "worker_timeout"); } } - ERL_NIF_TERM result = enif_make_copy(env, ctx->response_term); - pthread_mutex_unlock(&ctx->request_mutex); + pthread_mutex_unlock(&req->mutex); + + /* Copy result to caller's env */ + if (req->result_env != NULL) { + result = enif_make_copy(env, req->result); + } else { + result = make_error(env, "no_result"); + } + + ctx_request_release(req); return result; } @@ -3813,66 +4832,72 @@ static int owngil_context_init(py_context_t *ctx) { ctx->uses_own_gil = true; ctx->own_gil_tstate = NULL; ctx->own_gil_interp = NULL; - ctx->local_env_ptr = NULL; - atomic_store(&ctx->thread_running, false); + + /* Initialize worker thread state */ + atomic_store(&ctx->worker_running, false); atomic_store(&ctx->init_error, false); atomic_store(&ctx->shutdown_requested, false); + atomic_store(&ctx->leaked, false); + + /* Initialize request queue */ + ctx->queue_head = NULL; + ctx->queue_tail = NULL; + + /* Initialize legacy compatibility fields */ + ctx->shared_env = NULL; ctx->request_type = CTX_REQ_NONE; ctx->request_term = 0; - ctx->request_data = 0; ctx->response_term = 0; ctx->response_ok = false; + ctx->local_env_ptr = NULL; + ctx->reactor_buffer_ptr = NULL; - /* Initialize mutex and condition variables */ - if (pthread_mutex_init(&ctx->request_mutex, NULL) != 0) { - return -1; - } - - if (pthread_cond_init(&ctx->request_ready, NULL) != 0) { - pthread_mutex_destroy(&ctx->request_mutex); + /* Initialize queue mutex */ + if (pthread_mutex_init(&ctx->queue_mutex, NULL) != 0) { return -1; } - if (pthread_cond_init(&ctx->response_ready, NULL) != 0) { - pthread_cond_destroy(&ctx->request_ready); - pthread_mutex_destroy(&ctx->request_mutex); + /* Initialize queue condition variable */ + if (pthread_cond_init(&ctx->queue_not_empty, NULL) != 0) { + pthread_mutex_destroy(&ctx->queue_mutex); return -1; } - /* Create shared environment for term passing */ - ctx->shared_env = enif_alloc_env(); - if (ctx->shared_env == NULL) { - pthread_cond_destroy(&ctx->response_ready); - pthread_cond_destroy(&ctx->request_ready); - pthread_mutex_destroy(&ctx->request_mutex); + /* Create message environment for async responses */ + ctx->msg_env = enif_alloc_env(); + if (ctx->msg_env == NULL) { + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); return -1; } /* Start the worker thread */ - if (pthread_create(&ctx->own_gil_thread, NULL, owngil_context_thread_main, ctx) != 0) { - enif_free_env(ctx->shared_env); - pthread_cond_destroy(&ctx->response_ready); - pthread_cond_destroy(&ctx->request_ready); - pthread_mutex_destroy(&ctx->request_mutex); + if (pthread_create(&ctx->worker_thread, NULL, owngil_context_thread_main, ctx) != 0) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); return -1; } /* Wait for thread to initialize or fail */ int wait_count = 0; - while (!atomic_load(&ctx->thread_running) && + while (!atomic_load(&ctx->worker_running) && !atomic_load(&ctx->init_error) && wait_count < 2000) { usleep(1000); /* 1ms */ wait_count++; } - if (atomic_load(&ctx->init_error) || !atomic_load(&ctx->thread_running)) { + if (atomic_load(&ctx->init_error) || !atomic_load(&ctx->worker_running)) { /* Thread failed to start */ - pthread_join(ctx->own_gil_thread, NULL); - enif_free_env(ctx->shared_env); - pthread_cond_destroy(&ctx->response_ready); - pthread_cond_destroy(&ctx->request_ready); - pthread_mutex_destroy(&ctx->request_mutex); + pthread_join(ctx->worker_thread, NULL); + if (ctx->msg_env != NULL) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; + } + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); return -1; } @@ -3882,7 +4907,9 @@ static int owngil_context_init(py_context_t *ctx) { /** * @brief Shutdown OWN_GIL context and clean up resources * - * Uses a timeout to avoid hanging forever if the Python thread is stuck. + * Uses the join-or-leak pattern: if the worker thread doesn't respond + * within the timeout, we mark the context as leaked and do NOT free + * shared resources to avoid use-after-free. * * @param ctx Context to shutdown */ @@ -3896,48 +4923,58 @@ static void owngil_context_shutdown(py_context_t *ctx) { /* Signal shutdown */ atomic_store(&ctx->shutdown_requested, true); - pthread_mutex_lock(&ctx->request_mutex); - ctx->request_type = CTX_REQ_SHUTDOWN; - pthread_cond_signal(&ctx->request_ready); - pthread_mutex_unlock(&ctx->request_mutex); + /* Cancel all pending (not-yet-started) requests */ + ctx_queue_cancel_all(ctx); + + /* Enqueue shutdown request to wake worker if idle */ + ctx_request_t *shutdown_req = ctx_request_create(); + if (shutdown_req != NULL) { + shutdown_req->type = CTX_REQ_SHUTDOWN; + ctx_queue_enqueue(ctx, shutdown_req); + } /* Wait for thread to exit with timeout */ + bool join_succeeded = false; + #if defined(__linux__) struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += OWNGIL_SHUTDOWN_TIMEOUT_SECS; - int rc = pthread_timedjoin_np(ctx->own_gil_thread, NULL, &deadline); - if (rc == ETIMEDOUT) { - fprintf(stderr, "OWN_GIL shutdown timeout after %d seconds, detaching thread\n", - OWNGIL_SHUTDOWN_TIMEOUT_SECS); - pthread_detach(ctx->own_gil_thread); - } + int rc = pthread_timedjoin_np(ctx->worker_thread, NULL, &deadline); + join_succeeded = (rc == 0); #else - /* macOS/other: poll thread_running flag with timeout */ + /* macOS/other: poll worker_running flag with timeout */ int wait_ms = 0; - while (atomic_load(&ctx->thread_running) && + while (atomic_load(&ctx->worker_running) && wait_ms < OWNGIL_SHUTDOWN_TIMEOUT_SECS * 1000) { usleep(100000); /* 100ms */ wait_ms += 100; } - if (atomic_load(&ctx->thread_running)) { - fprintf(stderr, "OWN_GIL shutdown timeout after %d seconds, detaching thread\n", - OWNGIL_SHUTDOWN_TIMEOUT_SECS); - pthread_detach(ctx->own_gil_thread); - } else { - pthread_join(ctx->own_gil_thread, NULL); + if (!atomic_load(&ctx->worker_running)) { + pthread_join(ctx->worker_thread, NULL); + join_succeeded = true; } #endif - /* Clean up resources */ - if (ctx->shared_env != NULL) { - enif_free_env(ctx->shared_env); - ctx->shared_env = NULL; + if (!join_succeeded) { + /* Worker thread is unresponsive - use leak pattern */ + fprintf(stderr, "OWN_GIL shutdown timeout after %d seconds, leaking context\n", + OWNGIL_SHUTDOWN_TIMEOUT_SECS); + atomic_store(&ctx->leaked, true); + /* Do NOT free shared resources - worker thread may still be using them. + * The leaked thread is isolated and will eventually clean up itself + * when Python exits, or persist until VM exit. */ + return; + } + + /* Clean shutdown succeeded - safe to free resources */ + if (ctx->msg_env != NULL) { + enif_free_env(ctx->msg_env); + ctx->msg_env = NULL; } - pthread_cond_destroy(&ctx->response_ready); - pthread_cond_destroy(&ctx->request_ready); - pthread_mutex_destroy(&ctx->request_mutex); + pthread_cond_destroy(&ctx->queue_not_empty); + pthread_mutex_destroy(&ctx->queue_mutex); ctx->uses_own_gil = false; } @@ -3990,7 +5027,9 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T /* Initialize fields */ ctx->interp_id = atomic_fetch_add(&g_context_id_counter, 1); ctx->is_subinterp = use_owngil; - ctx->destroyed = false; + atomic_store(&ctx->destroyed, false); + atomic_store(&ctx->leaked, false); + atomic_store(&ctx->init_error, false); ctx->has_callback_handler = false; ctx->callback_pipe[0] = -1; ctx->callback_pipe[1] = -1; @@ -3998,6 +5037,7 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T ctx->locals = NULL; ctx->module_cache = NULL; ctx->executor_id = -1; /* Not assigned yet */ + ctx->uses_worker_thread = false; /* Create callback pipe for blocking callback responses */ if (pipe(ctx->callback_pipe) < 0) { @@ -4023,38 +5063,14 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T return enif_make_tuple3(env, ATOM_OK, ref, enif_make_uint(env, ctx->interp_id)); } #endif - { - /* Worker mode - create a thread state in main interpreter */ - PyGILState_STATE gstate = PyGILState_Ensure(); - -#ifndef HAVE_SUBINTERPRETERS - PyInterpreterState *interp = PyInterpreterState_Get(); - ctx->thread_state = PyThreadState_New(interp); -#endif - - ctx->globals = PyDict_New(); - ctx->locals = PyDict_New(); - ctx->module_cache = PyDict_New(); - - /* Import __builtins__ into globals */ - PyObject *builtins = PyEval_GetBuiltins(); - PyDict_SetItemString(ctx->globals, "__builtins__", builtins); - - /* Import erlang module into globals for worker mode */ - PyObject *erlang_module = PyImport_ImportModule("erlang"); - if (erlang_module != NULL) { - PyDict_SetItemString(ctx->globals, "erlang", erlang_module); - Py_DECREF(erlang_module); - } - - PyGILState_Release(gstate); - } - /* Assign executor for thread affinity in MULTI_EXECUTOR mode. - * This ensures numpy/torch thread-local state consistency. */ - if (g_execution_mode == PY_MODE_MULTI_EXECUTOR && - atomic_load(&g_multi_executor_initialized)) { - ctx->executor_id = select_executor(); + /* Worker mode: create dedicated pthread with main interpreter + * This provides stable thread affinity for numpy/torch/tensorflow */ + if (worker_context_init(ctx) != 0) { + close(ctx->callback_pipe[0]); + close(ctx->callback_pipe[1]); + enif_release_resource(ctx); + return make_error(env, "worker_init_failed"); } ERL_NIF_TERM ref = enif_make_resource(env, ctx); @@ -4069,10 +5085,10 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T * * nif_context_destroy(ContextRef) -> ok * - * For subinterpreter mode: releases the pool slot back to the pool. - * The pool owns the Python objects - context just references them. + * For owngil mode: shuts down the dedicated OWN_GIL thread. + * For worker mode: shuts down the dedicated worker thread. * - * For worker mode: cleans up Python objects directly with the main GIL. + * Both modes use the join-or-leak pattern for safe shutdown. */ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { (void)argc; @@ -4083,12 +5099,12 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_ } /* Skip if already destroyed */ - if (ctx->destroyed) { + if (atomic_load(&ctx->destroyed)) { return ATOM_OK; } /* Mark as destroyed early to prevent new operations */ - ctx->destroyed = true; + atomic_store(&ctx->destroyed, true); #ifdef HAVE_SUBINTERPRETERS /* OWN_GIL mode: shutdown the dedicated thread */ @@ -4108,7 +5124,23 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_ } #endif - /* Worker mode - clean up Python objects with GIL */ + /* Worker mode: shutdown the dedicated worker thread */ + if (ctx->uses_worker_thread) { + worker_context_shutdown(ctx); + /* Close callback pipes */ + if (ctx->callback_pipe[0] >= 0) { + close(ctx->callback_pipe[0]); + ctx->callback_pipe[0] = -1; + } + if (ctx->callback_pipe[1] >= 0) { + close(ctx->callback_pipe[1]); + ctx->callback_pipe[1] = -1; + } + atomic_fetch_add(&g_counters.ctx_destroyed, 1); + return ATOM_OK; + } + + /* Legacy mode (should not reach here with new architecture) */ if (runtime_is_running()) { PyGILState_STATE gstate = PyGILState_Ensure(); Py_XDECREF(ctx->module_cache); @@ -4127,6 +5159,16 @@ static ERL_NIF_TERM nif_context_destroy(ErlNifEnv *env, int argc, const ERL_NIF_ PyGILState_Release(gstate); } + /* Close callback pipes */ + if (ctx->callback_pipe[0] >= 0) { + close(ctx->callback_pipe[0]); + ctx->callback_pipe[0] = -1; + } + if (ctx->callback_pipe[1] >= 0) { + close(ctx->callback_pipe[1]); + ctx->callback_pipe[1] = -1; + } + atomic_fetch_add(&g_counters.ctx_destroyed, 1); return ATOM_OK; } @@ -4197,7 +5239,20 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER } #endif - /* Both worker mode and subinterpreter mode use py_context_acquire. + /* Worker thread mode: dispatch to dedicated thread */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Module, Func, Args, Kwargs} */ + ERL_NIF_TERM kwargs = (argc > 4 && enif_is_map(env, argv[4])) + ? argv[4] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple4(env, + argv[1], /* Module */ + argv[2], /* Func */ + argv[3], /* Args */ + kwargs); + return dispatch_to_worker_thread(env, ctx, CTX_REQ_CALL, request); + } + + /* Legacy mode: direct execution with py_context_acquire. * For subinterpreters, py_context_acquire handles PyThreadState_Swap * to switch to the pool slot's interpreter. */ ErlNifBinary module_bin, func_bin; @@ -4208,15 +5263,6 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER return make_error(env, "invalid_func"); } - /* Context thread affinity: dispatch via executor instead of direct execution. - * This ensures numpy/torch thread-local state consistency. */ - if (ctx->executor_id >= 0 && g_execution_mode == PY_MODE_MULTI_EXECUTOR && - atomic_load(&g_multi_executor_initialized)) { - ERL_NIF_TERM kwargs = (argc > 4 && enif_is_map(env, argv[4])) - ? argv[4] : enif_make_new_map(env); - return context_dispatch_call(env, ctx, &module_bin, &func_bin, argv[3], kwargs); - } - char *module_name = binary_to_string(&module_bin); char *func_name = binary_to_string(&func_bin); if (module_name == NULL || func_name == NULL) { @@ -4376,6 +5422,144 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER return result; } +/** + * @brief Async call - enqueue and return immediately + * + * nif_context_call_async(ContextRef, CallerPid, RequestId, Module, Func, Args, Kwargs) + * -> {enqueued, RequestId} | {error, Reason} + * + * The worker thread will send {py_result, RequestId, Result} to CallerPid when done. + */ +static ERL_NIF_TERM nif_context_call_async(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + py_context_t *ctx; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (argc < 6) { + return make_error(env, "badarg"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + /* Get caller PID */ + ErlNifPid caller_pid; + if (!enif_get_local_pid(env, argv[1], &caller_pid)) { + return make_error(env, "invalid_pid"); + } + + /* RequestId is argv[2] - can be any term */ + ERL_NIF_TERM request_id = argv[2]; + + /* Worker thread mode: dispatch async */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Module, Func, Args, Kwargs} */ + ERL_NIF_TERM kwargs = (argc > 6 && enif_is_map(env, argv[6])) + ? argv[6] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple4(env, + argv[3], /* Module */ + argv[4], /* Func */ + argv[5], /* Args */ + kwargs); + return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_CALL, + request, caller_pid, request_id, NULL); + } + + /* Not using worker thread - fall back to blocking call */ + return make_error(env, "async_requires_worker_thread"); +} + +/** + * @brief Async eval - enqueue and return immediately + * + * nif_context_eval_async(ContextRef, CallerPid, RequestId, Code, Locals) + * -> {enqueued, RequestId} | {error, Reason} + * + * The worker thread will send {py_result, RequestId, Result} to CallerPid when done. + */ +static ERL_NIF_TERM nif_context_eval_async(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + py_context_t *ctx; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (argc < 4) { + return make_error(env, "badarg"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + /* Get caller PID */ + ErlNifPid caller_pid; + if (!enif_get_local_pid(env, argv[1], &caller_pid)) { + return make_error(env, "invalid_pid"); + } + + /* RequestId is argv[2] - can be any term */ + ERL_NIF_TERM request_id = argv[2]; + + /* Worker thread mode: dispatch async */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Code, Locals} */ + ERL_NIF_TERM locals = (argc > 4 && enif_is_map(env, argv[4])) + ? argv[4] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple2(env, argv[3], locals); + return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_EVAL, + request, caller_pid, request_id, NULL); + } + + /* Not using worker thread - fall back to blocking call */ + return make_error(env, "async_requires_worker_thread"); +} + +/** + * @brief Async exec - enqueue and return immediately + * + * nif_context_exec_async(ContextRef, CallerPid, RequestId, Code) + * -> {enqueued, RequestId} | {error, Reason} + * + * The worker thread will send {py_result, RequestId, Result} to CallerPid when done. + */ +static ERL_NIF_TERM nif_context_exec_async(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + py_context_t *ctx; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (argc < 4) { + return make_error(env, "badarg"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + /* Get caller PID */ + ErlNifPid caller_pid; + if (!enif_get_local_pid(env, argv[1], &caller_pid)) { + return make_error(env, "invalid_pid"); + } + + /* RequestId is argv[2] - can be any term */ + ERL_NIF_TERM request_id = argv[2]; + + /* Worker thread mode: dispatch async */ + if (ctx->uses_worker_thread) { + return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_EXEC, + argv[3], caller_pid, request_id, NULL); + } + + /* Not using worker thread - fall back to blocking call */ + return make_error(env, "async_requires_worker_thread"); +} + /** * @brief Evaluate a Python expression in a context * @@ -4408,7 +5592,16 @@ static ERL_NIF_TERM nif_context_eval(ErlNifEnv *env, int argc, const ERL_NIF_TER } #endif - /* Both worker mode and subinterpreter mode use py_context_acquire. + /* Worker thread mode: dispatch to dedicated thread */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Code, Locals} */ + ERL_NIF_TERM locals = (argc > 2 && enif_is_map(env, argv[2])) + ? argv[2] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple2(env, argv[1], locals); + return dispatch_to_worker_thread(env, ctx, CTX_REQ_EVAL, request); + } + + /* Legacy mode: direct execution with py_context_acquire. * For subinterpreters, py_context_acquire handles PyThreadState_Swap * to switch to the pool slot's interpreter. */ ErlNifBinary code_bin; @@ -4416,15 +5609,6 @@ static ERL_NIF_TERM nif_context_eval(ErlNifEnv *env, int argc, const ERL_NIF_TER return make_error(env, "invalid_code"); } - /* Context thread affinity: dispatch via executor instead of direct execution. - * This ensures numpy/torch thread-local state consistency. */ - if (ctx->executor_id >= 0 && g_execution_mode == PY_MODE_MULTI_EXECUTOR && - atomic_load(&g_multi_executor_initialized)) { - ERL_NIF_TERM locals = (argc > 2 && enif_is_map(env, argv[2])) - ? argv[2] : enif_make_new_map(env); - return context_dispatch_eval(env, ctx, &code_bin, locals); - } - char *code = binary_to_string(&code_bin); if (code == NULL) { return make_error(env, "alloc_failed"); @@ -4554,7 +5738,12 @@ static ERL_NIF_TERM nif_context_exec(ErlNifEnv *env, int argc, const ERL_NIF_TER } #endif - /* Both worker mode and subinterpreter mode use py_context_acquire. + /* Worker thread mode: dispatch to dedicated thread */ + if (ctx->uses_worker_thread) { + return dispatch_to_worker_thread(env, ctx, CTX_REQ_EXEC, argv[1]); + } + + /* Legacy mode: direct execution with py_context_acquire. * For subinterpreters, py_context_acquire handles PyThreadState_Swap * to switch to the pool slot's interpreter. */ ErlNifBinary code_bin; @@ -4562,13 +5751,6 @@ static ERL_NIF_TERM nif_context_exec(ErlNifEnv *env, int argc, const ERL_NIF_TER return make_error(env, "invalid_code"); } - /* Context thread affinity: dispatch via executor instead of direct execution. - * This ensures numpy/torch thread-local state consistency. */ - if (ctx->executor_id >= 0 && g_execution_mode == PY_MODE_MULTI_EXECUTOR && - atomic_load(&g_multi_executor_initialized)) { - return context_dispatch_exec(env, ctx, &code_bin); - } - char *code = binary_to_string(&code_bin); if (code == NULL) { return make_error(env, "alloc_failed"); @@ -4949,6 +6131,12 @@ static ERL_NIF_TERM nif_context_exec_with_env(ErlNifEnv *env, int argc, const ER } #endif + /* Worker thread mode: dispatch to dedicated thread with local env */ + if (ctx->uses_worker_thread) { + /* For exec, we just pass the code binary */ + return dispatch_to_worker_thread_impl(env, ctx, CTX_REQ_EXEC_WITH_ENV, argv[1], penv); + } + char *code = binary_to_string(&code_bin); if (code == NULL) { return make_error(env, "alloc_failed"); @@ -5031,6 +6219,15 @@ static ERL_NIF_TERM nif_context_eval_with_env(ErlNifEnv *env, int argc, const ER } #endif + /* Worker thread mode: dispatch to dedicated thread with local env */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Code, Locals} */ + ERL_NIF_TERM locals = (argc > 2 && enif_is_map(env, argv[2])) + ? argv[2] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple2(env, argv[1], locals); + return dispatch_to_worker_thread_impl(env, ctx, CTX_REQ_EVAL_WITH_ENV, request, penv); + } + char *code = binary_to_string(&code_bin); if (code == NULL) { return make_error(env, "alloc_failed"); @@ -5189,6 +6386,19 @@ static ERL_NIF_TERM nif_context_call_with_env(ErlNifEnv *env, int argc, const ER } #endif + /* Worker thread mode: dispatch to dedicated thread with local env */ + if (ctx->uses_worker_thread) { + /* Build request tuple: {Module, Func, Args, Kwargs} */ + ERL_NIF_TERM kwargs = (argc > 4 && enif_is_map(env, argv[4])) + ? argv[4] : enif_make_new_map(env); + ERL_NIF_TERM request = enif_make_tuple4(env, + argv[1], /* Module */ + argv[2], /* Func */ + argv[3], /* Args */ + kwargs); + return dispatch_to_worker_thread_impl(env, ctx, CTX_REQ_CALL_WITH_ENV, request, penv); + } + char *module_name = binary_to_string(&module_bin); char *func_name = binary_to_string(&func_bin); if (module_name == NULL || func_name == NULL) { @@ -7061,7 +8271,6 @@ static ErlNifFunc nif_funcs[] = { /* Execution mode info */ {"execution_mode", 0, nif_execution_mode, 0}, - {"num_executors", 0, nif_num_executors, 0}, /* Thread worker support (ThreadPoolExecutor) */ {"thread_worker_set_coordinator", 1, nif_thread_worker_set_coordinator, 0}, @@ -7158,6 +8367,10 @@ static ErlNifFunc nif_funcs[] = { {"context_exec", 3, nif_context_exec_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_eval", 4, nif_context_eval_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_call", 6, nif_context_call_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + /* Async dispatch - non-blocking, returns immediately */ + {"context_call_async", 7, nif_context_call_async, 0}, + {"context_eval_async", 5, nif_context_eval_async, 0}, + {"context_exec_async", 4, nif_context_exec_async, 0}, {"create_local_env", 1, nif_create_local_env, 0}, {"interp_apply_imports", 2, nif_interp_apply_imports, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"interp_apply_paths", 2, nif_interp_apply_paths, ERL_NIF_DIRTY_JOB_CPU_BOUND}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index 050856a..d6ca87b 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -741,6 +741,150 @@ typedef enum { CTX_REQ_APPLY_PATHS /**< Apply paths to sys.path */ } ctx_request_type_t; +/** + * @struct ctx_request_t + * @brief Heap-allocated request for worker/owngil context queue + * + * Each request is heap-allocated with its own mutex/condvar for completion + * signaling. This replaces the single-slot pattern that had race conditions + * with multiple concurrent callers. + * + * Lifecycle: + * 1. Caller allocates request with ctx_request_create() + * 2. Caller fills in request data and copies terms to request_env + * 3. Caller enqueues request and increments refcount (now 2: caller + queue) + * 4. Worker dequeues request, processes it, fills result_env/result + * 5. Worker sends result via enif_send() and releases queue's ref + * 6. Caller receives result and releases its ref + * 7. When refcount hits 0, request is freed + * + * For OWN_GIL mode, the worker thread sends results via enif_send() to avoid + * blocking dirty schedulers. For worker mode (main interpreter), the same + * pattern is used for consistency. + */ +typedef struct ctx_request { + /** @brief Type of request */ + ctx_request_type_t type; + + /** @brief Per-request mutex for completion synchronization */ + pthread_mutex_t mutex; + + /** @brief Per-request condition for completion signaling */ + pthread_cond_t cond; + + /** @brief Set by worker when done (for blocking wait mode) */ + _Atomic bool completed; + + /** @brief Set by caller on timeout/destroy to skip processing */ + _Atomic bool cancelled; + + /* Request data (owned by this struct, not caller) */ + + /** @brief Environment for request terms (created by caller) */ + ErlNifEnv *request_env; + + /** @brief Request parameters (in request_env) */ + ERL_NIF_TERM request_data; + + /** @brief Process-local env pointer for WITH_ENV requests */ + void *local_env_ptr; + + /** @brief Reactor buffer pointer for reactor requests */ + void *reactor_buffer_ptr; + + /** @brief FD for reactor requests */ + int reactor_fd; + + /* Result data (owned by this struct) */ + + /** @brief Environment for result terms (created by worker) */ + ErlNifEnv *result_env; + + /** @brief Result term (in result_env) */ + ERL_NIF_TERM result; + + /** @brief True if request succeeded */ + bool success; + + /* Async delivery (for non-blocking dispatch) */ + + /** @brief Caller's PID for async result delivery */ + ErlNifPid caller_pid; + + /** @brief Request ID for correlating async responses */ + ERL_NIF_TERM request_id; + + /** @brief Whether to use async delivery vs blocking wait */ + bool async_mode; + + /* Queue management */ + + /** @brief Reference count (2=caller+queue, 1=one side, 0=free) */ + _Atomic int refcount; + + /** @brief Next request in queue */ + struct ctx_request *next; +} ctx_request_t; + +/** + * @brief Create a new context request + * @return Newly allocated request with refcount=1, or NULL on failure + */ +static inline ctx_request_t *ctx_request_create(void) { + ctx_request_t *req = enif_alloc(sizeof(ctx_request_t)); + if (req == NULL) return NULL; + + memset(req, 0, sizeof(ctx_request_t)); + pthread_mutex_init(&req->mutex, NULL); + pthread_cond_init(&req->cond, NULL); + atomic_store(&req->completed, false); + atomic_store(&req->cancelled, false); + atomic_store(&req->refcount, 1); + req->request_env = enif_alloc_env(); + req->result_env = NULL; /* Created by worker when processing */ + req->next = NULL; + req->async_mode = false; + req->reactor_fd = -1; + req->local_env_ptr = NULL; + req->reactor_buffer_ptr = NULL; + + return req; +} + +/** + * @brief Add a reference to a context request + * @param req The request + */ +static inline void ctx_request_addref(ctx_request_t *req) { + if (req) { + atomic_fetch_add(&req->refcount, 1); + } +} + +/** + * @brief Release a reference to a context request + * @param req The request (may be NULL) + * + * When refcount reaches 0, frees mutex/cond/envs and the request struct. + */ +static inline void ctx_request_release(ctx_request_t *req) { + if (req == NULL) return; + + int prev = atomic_fetch_sub(&req->refcount, 1); + if (prev == 1) { + /* Last reference - free everything */ + pthread_mutex_destroy(&req->mutex); + pthread_cond_destroy(&req->cond); + if (req->request_env) { + enif_free_env(req->request_env); + } + if (req->result_env) { + enif_free_env(req->result_env); + } + enif_free(req); + } +} + /** * @struct py_cmd_t * @brief Command structure for thread-per-context dispatch @@ -804,8 +948,11 @@ struct py_context { /** @brief Context mode: true=subinterpreter, false=worker */ bool is_subinterp; - /** @brief Flag indicating context has been destroyed */ - bool destroyed; + /** @brief Flag indicating context has been destroyed (atomic for thread safety) */ + _Atomic bool destroyed; + + /** @brief Flag: context resources leaked due to unresponsive worker */ + _Atomic bool leaked; /** @brief Flag: callback handler is configured */ bool has_callback_handler; @@ -816,70 +963,79 @@ struct py_context { /** @brief Pipe for callback responses [read, write] */ int callback_pipe[2]; -#ifdef HAVE_SUBINTERPRETERS - /* ========== OWN_GIL mode fields ========== */ + /* ========== Worker thread fields (used by both worker and owngil modes) ========== */ - /** @brief Whether this context uses OWN_GIL mode (dedicated pthread) */ - bool uses_own_gil; + /** @brief Dedicated pthread for this context */ + pthread_t worker_thread; - /** @brief Dedicated pthread for OWN_GIL mode */ - pthread_t own_gil_thread; + /** @brief True when worker thread is running */ + _Atomic bool worker_running; - /** @brief Thread state for OWN_GIL subinterpreter */ - PyThreadState *own_gil_tstate; + /** @brief True when shutdown has been requested */ + _Atomic bool shutdown_requested; - /** @brief Interpreter state for OWN_GIL subinterpreter */ - PyInterpreterState *own_gil_interp; + /** @brief True if this context uses a dedicated worker thread (worker mode) */ + bool uses_worker_thread; - /* IPC via condition variables */ + /** @brief True if thread initialization failed */ + _Atomic bool init_error; - /** @brief Mutex for request/response synchronization */ - pthread_mutex_t request_mutex; + /* ========== Request queue (replaces single-slot pattern) ========== */ - /** @brief Condition variable: request ready for processing */ - pthread_cond_t request_ready; + /** @brief Mutex protecting the request queue */ + pthread_mutex_t queue_mutex; - /** @brief Condition variable: response ready for caller */ - pthread_cond_t response_ready; + /** @brief Condition variable: work available in queue */ + pthread_cond_t queue_not_empty; - /* Request/response state */ + /** @brief Head of request queue (dequeue from here) */ + ctx_request_t *queue_head; - /** @brief Current request type (CTX_REQ_*) */ - int request_type; + /** @brief Tail of request queue (enqueue here) */ + ctx_request_t *queue_tail; + + /** @brief Environment for sending messages back to Erlang */ + ErlNifEnv *msg_env; + + /* ========== Legacy compatibility fields (populated from queue request) ========== */ + /* These fields are populated by the worker thread from the current request + * for compatibility with existing execute functions. They will be removed + * once all execute functions are refactored to use ctx_request_t directly. */ - /** @brief Shared environment for zero-copy term passing */ + /** @brief Shared env for current request (points to current req->request_env) */ ErlNifEnv *shared_env; - /** @brief Request term (copied into shared_env) */ - ERL_NIF_TERM request_term; + /** @brief Current request type */ + int request_type; - /** @brief Additional request data (e.g., modules list for flush) */ - ERL_NIF_TERM request_data; + /** @brief Current request data term */ + ERL_NIF_TERM request_term; - /** @brief Response term (created in shared_env) */ + /** @brief Response term for current request */ ERL_NIF_TERM response_term; - /** @brief True if response indicates success */ + /** @brief Success flag for current request */ bool response_ok; - /** @brief Auxiliary pointer for reactor buffer (OWN_GIL dispatch) */ + /** @brief Reactor buffer pointer for current request */ void *reactor_buffer_ptr; - /** @brief Process-local env pointer for OWN_GIL dispatch (py_env_resource_t*) */ + /** @brief Process-local env pointer for current request */ void *local_env_ptr; - /* Lifecycle flags */ +#ifdef HAVE_SUBINTERPRETERS + /* ========== OWN_GIL specific fields ========== */ - /** @brief True when worker thread is running */ - _Atomic bool thread_running; + /** @brief Whether this context uses OWN_GIL mode (subinterpreter with own GIL) */ + bool uses_own_gil; - /** @brief True if thread initialization failed */ - _Atomic bool init_error; + /** @brief Thread state for OWN_GIL subinterpreter */ + PyThreadState *own_gil_tstate; - /** @brief True when shutdown has been requested */ - _Atomic bool shutdown_requested; + /** @brief Interpreter state for OWN_GIL subinterpreter */ + PyInterpreterState *own_gil_interp; #else - /** @brief Worker thread state (non-subinterp mode) */ + /** @brief Worker thread state (non-subinterp mode, kept for compatibility) */ PyThreadState *thread_state; #endif @@ -1005,7 +1161,7 @@ static inline py_context_guard_t py_context_acquire(py_context_t *ctx) { .acquired = false }; - if (ctx == NULL || ctx->destroyed) { + if (ctx == NULL || atomic_load(&ctx->destroyed)) { return guard; } diff --git a/docs/getting-started.md b/docs/getting-started.md index 8f1ce8d..3f40fc6 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -8,7 +8,7 @@ Add to your `rebar.config`: ```erlang {deps, [ - {erlang_python, "2.3.0"} + {erlang_python, "3.0.0"} ]}. ``` diff --git a/docs/migration.md b/docs/migration.md index b092b8f..5d0b2fb 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -1,6 +1,103 @@ -# Migration Guide: v1.8.x to v2.0+ +# Migration Guide -This guide covers breaking changes and migration steps when upgrading from erlang_python v1.8.x to v2.0 and later. +This guide covers breaking changes and migration steps when upgrading erlang_python. + +## v2.x to v3.0 Migration + +### Quick Checklist + +- [ ] Update `py:execution_mode/0` usage - now returns `worker | owngil` only +- [ ] Remove any `py:num_executors/0` calls (function removed) +- [ ] Update code that checks for `free_threaded` or `multi_executor` modes +- [ ] Review `context_mode` configuration (now `worker | owngil`) + +### Execution Mode Changes + +**v2.x:** `py:execution_mode/0` returned internal capabilities: +```erlang +py:execution_mode(). +%% => free_threaded | subinterp | multi_executor +``` + +**v3.0:** Returns simplified public modes based on configuration: +```erlang +py:execution_mode(). +%% => worker | owngil +``` + +The mode is determined by the `context_mode` application config: +```erlang +%% Default: worker mode +application:set_env(erlang_python, context_mode, worker). + +%% For true parallelism (Python 3.14+) +application:set_env(erlang_python, context_mode, owngil). +``` + +### Removed Functions + +**`py:num_executors/0`** - Removed. Contexts now use per-context worker threads. + +```erlang +%% v2.x - check executor count +N = py:num_executors(). + +%% v3.0 - not needed, each context has its own worker thread +``` + +### Worker Thread Architecture + +In v3.0, each context gets a dedicated pthread that handles all Python operations: + +- **Stable thread affinity**: All calls to the same context run on the same OS thread +- **numpy/torch compatibility**: Thread-local state is preserved +- **No executor pool**: No shared executor threads to manage + +```erlang +%% Create contexts - each gets its own worker thread +Ctx1 = py:context(1), +Ctx2 = py:context(2), + +%% All calls to Ctx1 run on Ctx1's worker thread +%% All calls to Ctx2 run on Ctx2's worker thread +{ok, _} = py:call(Ctx1, math, sqrt, [16]), +{ok, _} = py:call(Ctx2, math, sqrt, [25]). +``` + +### Configuration Changes + +**v2.x configuration:** +```erlang +{erlang_python, [ + {num_executors, 8}, %% Removed in v3.0 + {context_mode, worker} +]} +``` + +**v3.0 configuration:** +```erlang +{erlang_python, [ + {context_mode, worker}, %% worker | owngil + {num_contexts, 8} %% Number of contexts to create +]} +``` + +### Python Version Compatibility + +| Python Version | v2.x Mode | v3.0 Mode | +|---------------|-----------|-----------| +| 3.9 - 3.11 | `multi_executor` | `worker` | +| 3.12 - 3.13 | `subinterp` | `worker` (default) or `owngil` | +| 3.14+ | `subinterp` | `worker` (default) or `owngil` | +| 3.13t (free-threaded) | `free_threaded` | `worker` | + +All Python versions now use the same public mode (`worker` or `owngil`) based on configuration, not Python capabilities. + +--- + +# v1.8.x to v2.0 Migration + +This section covers breaking changes when upgrading from erlang_python v1.8.x to v2.0. ## Quick Checklist @@ -16,17 +113,18 @@ This guide covers breaking changes and migration steps when upgrading from erlan ## Python Version Compatibility -| Python Version | GIL Mode | Notes | -|---------------|----------|-------| -| 3.9 - 3.11 | Shared GIL | Multi-executor mode, `py:execution_mode()` returns `multi_executor` | -| 3.12 - 3.13 | OWN_GIL subinterpreters | True parallelism, `py:execution_mode()` returns `subinterp` | -| 3.13t | Free-threaded | No GIL, `py:execution_mode()` returns `free_threaded` | -| 3.14+ | SHARED_GIL subinterpreters | Subinterpreters with shared GIL for C extension compatibility | +| Python Version | Support | Notes | +|---------------|---------|-------| +| 3.9 - 3.11 | Full | Worker mode with dedicated pthread per context | +| 3.12 - 3.13 | Full | Worker mode (default) or owngil mode | +| 3.14+ | Full | Worker mode (default) or owngil mode with true parallelism | +| 3.13t | Full | Worker mode (free-threaded builds supported) | -**Python 3.14 Support**: Full support for Python 3.14 including: -- SHARED_GIL subinterpreter mode for C extension compatibility -- Proper `sys.path` initialization in subinterpreters -- All asyncio features work correctly +**Python 3.14+ OWN_GIL Support**: For true parallelism, use owngil mode: +```erlang +application:set_env(erlang_python, context_mode, owngil). +``` +Each context gets a subinterpreter with its own GIL, enabling parallel Python execution. **FreeBSD Support**: Improved fd handling on FreeBSD/kqueue platforms: - Automatic fd duplication in `py_reactor_context` to prevent fd stealing errors @@ -83,11 +181,10 @@ The most significant change in v2.0 is the new execution model. On Python 3.12+, Check which mode is active: ```erlang -%% Check execution mode +%% Check execution mode (v3.0+) py:execution_mode(). -%% => subinterp (Python 3.12+ with OWN_GIL) -%% => free_threaded (Python 3.13t with --disable-gil) -%% => multi_executor (Python < 3.12) +%% => worker (default, dedicated pthread per context) +%% => owngil (dedicated pthread + subinterpreter with own GIL) %% Check if subinterpreters are supported py:subinterp_supported(). diff --git a/docs/scalability.md b/docs/scalability.md index 2985a20..cefe59f 100644 --- a/docs/scalability.md +++ b/docs/scalability.md @@ -4,30 +4,24 @@ This guide covers the scalability features of erlang_python, including execution ## Execution Modes -erlang_python automatically detects the optimal execution mode based on your Python version: +erlang_python supports two execution modes: ```erlang %% Check current execution mode py:execution_mode(). -%% => free_threaded | worker | owngil | multi_executor - -%% Check number of executor threads -py:num_executors(). -%% => 4 (default) +%% => worker | owngil ``` ### Mode Comparison -| Mode | Python Version | Parallelism | GIL Behavior | Best For | -|------|----------------|-------------|--------------|----------| -| **free_threaded** | 3.13+ (nogil build) | True N-way | None | Maximum throughput | -| **owngil** | 3.14+ | True N-way | Per-interpreter (dedicated thread) | CPU-bound parallel | -| **worker** | 3.12+ | GIL contention | Shared GIL | Default, compatibility | -| **multi_executor** | < 3.12 | GIL contention | Shared, round-robin | I/O-bound, legacy | +| Mode | Description | Parallelism | GIL Behavior | Best For | +|------|-------------|-------------|--------------|----------| +| **worker** | Dedicated pthread per context | GIL contention | Shared GIL | Default, maximum compatibility | +| **owngil** | Dedicated pthread + subinterpreter | True N-way | Per-interpreter GIL | CPU-bound parallel (Python 3.14+) | -### Free-Threaded Mode (Python 3.13+) +### Worker Mode (Default) -When running on a free-threaded Python build (compiled with `--disable-gil`), erlang_python executes Python calls directly without any executor routing. This provides maximum parallelism for CPU-bound workloads. +Each context gets a dedicated pthread that handles all Python operations. This provides stable thread affinity, which is critical for libraries like numpy, torch, and tensorflow that maintain thread-local state. ### OWN_GIL Mode (Python 3.12+) @@ -67,18 +61,6 @@ ok = py_nif:context_exec(CtxRef, <<"x = 42">>, Env), **See also:** [OWN_GIL Internals](owngil_internals.md) for architecture details. -### Sub-interpreter Mode (Python 3.12+) - -Uses Python's sub-interpreter feature with a shared GIL pool. Multiple contexts share the GIL but have isolated namespaces. Best for high call frequency with low latency. - -**Architecture:** -- Pool of pre-created subinterpreters with shared GIL -- Execution on dirty schedulers with `PyThreadState_Swap` -- Lower latency (~2.5μs) but no true parallelism -- Best throughput for short operations - -**Note:** Each sub-interpreter has isolated state. Use the [Shared State](#shared-state) API to share data between workers. - **Explicit Context Selection:** ```erlang %% Get a specific context by index (1-based) @@ -89,58 +71,29 @@ Ctx = py:context(1), {ok, Result} = py:call(math, sqrt, [16]). ``` -### Multi-Executor Mode (Python < 3.12) - -Runs N executor threads that share the GIL. Requests are distributed round-robin across executors. Good for I/O-bound workloads where Python releases the GIL during I/O operations. - -**Thread Affinity:** In MULTI_EXECUTOR mode, both workers and contexts are assigned -a fixed executor thread. This ensures libraries with thread-local state (numpy, torch, -tensorflow) always run on the same OS thread, preventing segfaults and state corruption. - ## Choosing the Right Mode -### Mode Comparison - -| Aspect | Free-Threaded | OWN_GIL | Worker | Multi-Executor | -|--------|---------------|---------|--------|----------------| -| **Parallelism** | True N-way | True N-way | GIL contention | GIL contention | -| **State Isolation** | Shared | Isolated | Shared | Shared | -| **Memory Overhead** | Low | Higher (per-interp) | Low | Low | -| **Module Compatibility** | Limited | Most modules | All modules | All modules | -| **Python Version** | 3.13+ (nogil) | 3.14+ | 3.12+ | < 3.12 | - ### When to Use Each Mode -**Use Free-Threaded (Python 3.13t) when:** -- You need maximum parallelism with shared state -- Your libraries are GIL-free compatible -- You're running CPU-bound workloads -- Memory efficiency is important +**Use Worker Mode (default) when:** +- You need maximum module compatibility +- Running libraries like numpy, torch, tensorflow +- High call frequency with low latency +- Shared state between contexts is needed -**Use OWN_GIL (Python 3.14+) when:** +**Use OWN_GIL Mode when:** - You need true CPU parallelism across Python contexts - Running long computations (ML inference, data processing) - Workload benefits from multiple independent Python interpreters - You can tolerate higher per-call latency for better throughput -**Use Worker (Python 3.12+, default) when:** -- You need high call frequency with low latency -- Maximum module compatibility is required -- Shared state between contexts is needed -- Running libraries that don't support subinterpreters (torch, etc.) - -**Use Multi-Executor (Python < 3.12) when:** -- Running on older Python versions -- Your workload is I/O-bound (GIL released during I/O) -- Thread affinity for numpy/torch is needed - ### Pros and Cons **Worker Mode Pros:** - Maximum module compatibility (all C extensions work) +- Stable thread affinity for numpy/torch/tensorflow - Low memory overhead (single interpreter) - Shared state between contexts -- Default mode for Python 3.12+ **Worker Mode Cons:** - GIL contention limits parallelism @@ -156,17 +109,6 @@ tensorflow) always run on the same OS thread, preventing segfaults and state cor - Some C extensions don't support subinterpreters - Requires Python 3.14+ -**Free-Threaded Mode Pros:** -- True parallelism with shared state -- Lower memory overhead than OWN_GIL -- Simplest mental model (like regular threading) - -**Free-Threaded Mode Cons:** -- Requires Python 3.13+ built with `--disable-gil` -- Many C extensions not yet compatible -- Shared state requires careful synchronization -- Still experimental - ## Subinterpreter Architecture ### Design Overview @@ -308,14 +250,13 @@ This allows your application to implement backpressure or shed load gracefully. %% Default: erlang:system_info(schedulers) * 2 + 1 {max_concurrent, 50}, - %% Number of executor threads (multi_executor mode only) - %% Default: 4 - {num_executors, 8}, + %% Context mode: worker | owngil + %% Default: worker + {context_mode, worker}, - %% Worker pool sizes - {num_workers, 4}, - {num_async_workers, 2}, - {num_subinterp_workers, 4} + %% Number of contexts + %% Default: erlang:system_info(schedulers) + {num_contexts, 8} ]} ]. ``` @@ -460,9 +401,9 @@ free_threaded ### For I/O-Bound Workloads -- Multi-executor mode works well (GIL released during I/O) -- Increase `num_executors` to handle more concurrent I/O +- Worker mode works well (GIL released during I/O) - Use asyncio integration for async I/O +- Increase `num_contexts` for more concurrent I/O capacity ### For Mixed Workloads @@ -481,8 +422,7 @@ io:format("Python load: ~.1f%~n", [Utilization]). %% Execution mode info Mode = py:execution_mode(), -Executors = py:num_executors(), -io:format("Mode: ~p, Executors: ~p~n", [Mode, Executors]). +io:format("Mode: ~p~n", [Mode]). %% Memory stats {ok, Stats} = py:memory_stats(), diff --git a/src/erlang_python.app.src b/src/erlang_python.app.src index 9378a24..ae6e135 100644 --- a/src/erlang_python.app.src +++ b/src/erlang_python.app.src @@ -1,6 +1,6 @@ {application, erlang_python, [ {description, "Execute Python applications from Erlang using dirty NIFs"}, - {vsn, "2.3.1"}, + {vsn, "3.0.0"}, {registered, [py_pool]}, {mod, {erlang_python_app, []}}, {applications, [ diff --git a/src/py.erl b/src/py.erl index 387cecf..8f719a8 100644 --- a/src/py.erl +++ b/src/py.erl @@ -107,7 +107,6 @@ venv_info/0, %% Execution info execution_mode/0, - num_executors/0, %% Shared state (accessible from Python workers) state_fetch/1, state_store/2, @@ -1257,30 +1256,22 @@ ensure_binary(S) -> %% @doc Get the current execution mode. %% Returns one of: -%% - `free_threaded': Python 3.13+ with no GIL (Py_GIL_DISABLED) -%% - `worker': Contexts use main interpreter namespaces (default) -%% - `owngil': Contexts use dedicated threads with own GIL (Python 3.14+) -%% - `multi_executor': Traditional Python with N executor threads (Python < 3.12) --spec execution_mode() -> free_threaded | worker | owngil | multi_executor. +%% - `worker': Contexts use dedicated pthread per context (default). +%% Provides stable thread affinity for numpy/torch/tensorflow compatibility. +%% - `owngil': Contexts use dedicated pthread + subinterpreter with own GIL. +%% Enables true parallelism (Python 3.12+ with subinterpreter support). +%% +%% The mode is determined by the `context_mode' application config: +%% ``` +%% application:set_env(erlang_python, context_mode, owngil). +%% ''' +-spec execution_mode() -> worker | owngil. execution_mode() -> - case py_nif:execution_mode() of - free_threaded -> free_threaded; - multi_executor -> multi_executor; - subinterp -> - %% Check actual context_mode config - case application:get_env(erlang_python, context_mode, worker) of - owngil -> owngil; - _ -> worker - end + case application:get_env(erlang_python, context_mode, worker) of + owngil -> owngil; + _ -> worker end. -%% @doc Get the number of executor threads. -%% For `multi_executor' mode, this is the number of executor threads. -%% For other modes, returns 1. --spec num_executors() -> pos_integer(). -num_executors() -> - py_nif:num_executors(). - %%% ============================================================================ %%% Shared State %%% ============================================================================ diff --git a/src/py_context.erl b/src/py_context.erl index 9c342b6..5c5dbad 100644 --- a/src/py_context.erl +++ b/src/py_context.erl @@ -563,11 +563,12 @@ loop(#state{ref = Ref, interp_id = InterpId} = State) -> loop(State); {exec, From, MRef, Code} -> - Result = py_nif:context_exec(Ref, Code), + Result = handle_exec_with_async(Ref, Code), From ! {MRef, Result}, loop(State); %% Exec with process-local environment (worker mode) + %% Note: Uses blocking dispatch since async+env isn't implemented yet. {exec, From, MRef, Code, EnvRef} -> Result = py_nif:context_exec(Ref, Code, EnvRef), From ! {MRef, Result}, @@ -723,7 +724,23 @@ handle_blocking_callback(Ref, FuncName, Args) -> %% @private %% Handle call with potential suspension for callbacks +%% Uses async dispatch to avoid blocking dirty schedulers when possible. handle_call_with_suspension(Ref, Module, Func, Args, Kwargs) -> + RequestId = make_ref(), + case py_nif:context_call_async(Ref, self(), RequestId, Module, Func, Args, Kwargs) of + {enqueued, RequestId} -> + %% Async dispatch succeeded - wait for result message + wait_for_async_result(Ref, RequestId); + {error, async_requires_worker_thread} -> + %% Fall back to blocking call for non-worker-thread contexts + handle_call_blocking(Ref, Module, Func, Args, Kwargs); + {error, Reason} -> + {error, Reason} + end. + +%% @private +%% Blocking call handler (used when async is not available) +handle_call_blocking(Ref, Module, Func, Args, Kwargs) -> case py_nif:context_call(Ref, Module, Func, Args, Kwargs) of {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> %% Callback needed - handle it with recursive receive @@ -740,7 +757,36 @@ handle_call_with_suspension(Ref, Module, Func, Args, Kwargs) -> %% @private %% Handle eval with potential suspension for callbacks +%% Uses async dispatch to avoid blocking dirty schedulers when possible. handle_eval_with_suspension(Ref, Code, Locals) -> + RequestId = make_ref(), + case py_nif:context_eval_async(Ref, self(), RequestId, Code, Locals) of + {enqueued, RequestId} -> + %% Async dispatch succeeded - wait for result message + wait_for_async_result(Ref, RequestId); + {error, async_requires_worker_thread} -> + %% Fall back to blocking call for non-worker-thread contexts + handle_eval_blocking(Ref, Code, Locals); + {error, Reason} -> + {error, Reason} + end. + +%% @private +%% Handle exec with async dispatch +handle_exec_with_async(Ref, Code) -> + RequestId = make_ref(), + case py_nif:context_exec_async(Ref, self(), RequestId, Code) of + {enqueued, RequestId} -> + wait_for_async_result(Ref, RequestId); + {error, async_requires_worker_thread} -> + py_nif:context_exec(Ref, Code); + {error, Reason} -> + {error, Reason} + end. + +%% @private +%% Blocking eval handler (used when async is not available) +handle_eval_blocking(Ref, Code, Locals) -> case py_nif:context_eval(Ref, Code, Locals) of {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> %% Callback needed - handle it with recursive receive @@ -755,8 +801,31 @@ handle_eval_with_suspension(Ref, Code, Locals) -> Result end. +%% @private +%% Wait for async result from worker thread +%% The worker thread sends {py_result, RequestId, Result} when done. +wait_for_async_result(Ref, RequestId) -> + receive + {py_result, RequestId, Result} -> + process_async_result(Ref, Result) + after 300000 -> %% 5 minute timeout + {error, async_timeout} + end. + +%% @private +%% Process the result from async dispatch +%% Handles suspension, schedule markers, and normal results. +process_async_result(Ref, {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}}) -> + CallbackResult = handle_callback_with_nested_receive(Ref, FuncName, CallbackArgs), + resume_and_continue(Ref, StateRef, CallbackResult); +process_async_result(Ref, {schedule, CallbackName, CallbackArgs}) -> + handle_schedule(Ref, CallbackName, CallbackArgs); +process_async_result(_Ref, Result) -> + Result. + %% @private %% Handle call with process-local environment +%% Note: Uses blocking dispatch since async+env isn't implemented yet. handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef) -> case py_nif:context_call(Ref, Module, Func, Args, Kwargs, EnvRef) of {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> @@ -770,6 +839,7 @@ handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef) -> %% @private %% Handle eval with process-local environment +%% Note: Uses blocking dispatch since async+env isn't implemented yet. handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef) -> case py_nif:context_eval(Ref, Code, Locals, EnvRef) of {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> diff --git a/src/py_nif.erl b/src/py_nif.erl index 2625ea2..42084b0 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -82,7 +82,6 @@ owngil_apply_paths/3, %% Execution mode info execution_mode/0, - num_executors/0, %% Thread worker support (ThreadPoolExecutor) thread_worker_set_coordinator/1, thread_worker_write/2, @@ -176,6 +175,10 @@ context_eval/4, context_exec/2, context_exec/3, + %% Async dispatch (non-blocking) + context_call_async/7, + context_eval_async/5, + context_exec_async/4, context_call_method/4, create_local_env/1, context_to_term/1, @@ -653,22 +656,19 @@ owngil_apply_paths(_WorkerId, _HandleId, _Paths) -> %%% Execution Mode Info %%% ============================================================================ -%% @doc Get the current execution mode. -%% Returns one of: free_threaded | subinterp | multi_executor +%% @doc Get Python capability (internal use). +%% Returns the detected Python runtime capability: %% - free_threaded: Python 3.13+ with no GIL (Py_GIL_DISABLED) -%% - subinterp: Python 3.12+ with per-interpreter GIL -%% - multi_executor: Traditional Python with N executor threads +%% - subinterp: Python 3.12+ with per-interpreter GIL support +%% - multi_executor: Traditional Python with executor threads +%% +%% For public execution mode, use py:execution_mode/0 which returns +%% `worker | owngil' based on the application configuration. +%% @private -spec execution_mode() -> free_threaded | subinterp | multi_executor. execution_mode() -> ?NIF_STUB. -%% @doc Get the number of executor threads. -%% For multi_executor mode, this is the number of executor threads. -%% For other modes, returns 1. --spec num_executors() -> pos_integer(). -num_executors() -> - ?NIF_STUB. - %%% ============================================================================ %%% Thread Worker Support (ThreadPoolExecutor) %%% ============================================================================ @@ -1337,6 +1337,58 @@ context_exec(_ContextRef, _Code) -> context_exec(_ContextRef, _Code, _EnvRef) -> ?NIF_STUB. +%% @doc Async call - enqueue and return immediately. +%% +%% Dispatches a Python function call to the worker thread and returns +%% immediately with {enqueued, RequestId}. The worker thread will send +%% {py_result, RequestId, Result} to CallerPid when done. +%% +%% @param ContextRef Context reference +%% @param CallerPid PID to send result to +%% @param RequestId Request ID for correlation +%% @param Module Python module name +%% @param Func Function name +%% @param Args List of arguments +%% @param Kwargs Keyword arguments map +%% @returns {enqueued, RequestId} | {error, Reason} +-spec context_call_async(reference(), pid(), term(), binary(), binary(), list(), map()) -> + {enqueued, term()} | {error, term()}. +context_call_async(_ContextRef, _CallerPid, _RequestId, _Module, _Func, _Args, _Kwargs) -> + ?NIF_STUB. + +%% @doc Async eval - enqueue and return immediately. +%% +%% Dispatches a Python eval to the worker thread and returns immediately +%% with {enqueued, RequestId}. The worker thread will send +%% {py_result, RequestId, Result} to CallerPid when done. +%% +%% @param ContextRef Context reference +%% @param CallerPid PID to send result to +%% @param RequestId Request ID for correlation +%% @param Code Python expression to evaluate +%% @param Locals Local variables map +%% @returns {enqueued, RequestId} | {error, Reason} +-spec context_eval_async(reference(), pid(), term(), binary(), map()) -> + {enqueued, term()} | {error, term()}. +context_eval_async(_ContextRef, _CallerPid, _RequestId, _Code, _Locals) -> + ?NIF_STUB. + +%% @doc Async exec - enqueue and return immediately. +%% +%% Dispatches Python code execution to the worker thread and returns +%% immediately with {enqueued, RequestId}. The worker thread will send +%% {py_result, RequestId, Result} to CallerPid when done. +%% +%% @param ContextRef Context reference +%% @param CallerPid PID to send result to +%% @param RequestId Request ID for correlation +%% @param Code Python code to execute +%% @returns {enqueued, RequestId} | {error, Reason} +-spec context_exec_async(reference(), pid(), term(), binary()) -> + {enqueued, term()} | {error, term()}. +context_exec_async(_ContextRef, _CallerPid, _RequestId, _Code) -> + ?NIF_STUB. + %% @doc Call a method on a Python object in a context. %% %% NO MUTEX - caller must ensure exclusive access (process ownership). diff --git a/test/py_SUITE.erl b/test/py_SUITE.erl index dc41700..d3e4f5d 100644 --- a/test/py_SUITE.erl +++ b/test/py_SUITE.erl @@ -41,7 +41,6 @@ test_venv_pth/1, %% New scalability tests test_execution_mode/1, - test_num_executors/1, test_semaphore_basic/1, test_semaphore_acquire_release/1, test_semaphore_concurrent/1, @@ -101,7 +100,6 @@ all() -> test_venv_pth, %% Scalability tests test_execution_mode, - test_num_executors, test_semaphore_basic, test_semaphore_acquire_release, test_semaphore_concurrent, @@ -733,15 +731,7 @@ test_execution_mode(_Config) -> %% Test that execution_mode returns a valid mode Mode = py:execution_mode(), ct:pal("Execution mode: ~p~n", [Mode]), - true = lists:member(Mode, [free_threaded, subinterp, multi_executor]), - ok. - -test_num_executors(_Config) -> - %% Test that num_executors returns a positive integer - Num = py:num_executors(), - ct:pal("Number of executors: ~p~n", [Num]), - true = is_integer(Num), - true = Num > 0, + true = lists:member(Mode, [worker, owngil]), ok. test_semaphore_basic(_Config) ->