diff --git a/CHANGELOG.md b/CHANGELOG.md index cf70ec7..e589160 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,67 @@ ## Unreleased +### Fixed + +- **NIF robustness hardening** - `make_py_error` no longer passes a NULL message/type + to `enif_make_string`/`enif_make_atom` when a Python exception's text isn't + UTF-8-encodable; `binary_to_string` rejects names/code containing an embedded NUL + (which would silently truncate a module/function/attr/code string) rather than + truncating; a leaked `split` method object in the reactor buffer is released; and a + stray debug `fprintf` on the normal worker send path is removed. + +### Security + +- **No shell for venv/installer commands** - `py:ensure_venv` and dependency + installation now run the executables via `open_port({spawn_executable, ...})` with an + argument list instead of building a shell string for `os:cmd`. Venv paths, requirement + files, and extras are passed literally, so shell metacharacters can't be injected. For + `uv`, `VIRTUAL_ENV` is passed via the port `{env, ...}` option rather than a shell prefix. +- **Bounded shared state + safe stream/log builders** - `py_state` gained an optional + `max_state_entries` cap (default `infinity`, unchanged behavior) enforced with atomic + admission so Python-driven `state_set` can't exhaust node memory, and its size counter + is protected from corruption. The `py:stream` and logging helpers that build Python + source now strictly validate module/function/kwarg names as identifiers (rejecting + injection at positions where quoting is meaningless) and escape string-literal values + including control characters. +- **Validated event-loop fd handles** - The asyncio reader/writer integration no longer + hands Python a raw `fd_resource` pointer as an integer key. Each handle is an opaque id + validated against a registry on every use, so a stale, duplicate, or fabricated id is a + safe no-op (or clean error) instead of a double-free or arbitrary-pointer dereference + that crashed the node. `fd_read`/`fd_write` also moved to dirty IO schedulers. +- **OWN_GIL worker robustness** (Python 3.14+) - A per-request allocation failure in + a subinterpreter worker no longer `break`s (and permanently kills) the worker command + loop; it returns an error and keeps serving. The `owngil_*` dispatch NIFs now run on + dirty IO schedulers and use non-blocking, deadline-bounded pipe reads and writes, so a + stalled or dead worker can't wedge a scheduler forever. The internal `SuspensionRequired` + exception is now looked up per-interpreter (like `ProcessError`), avoiding cross- + interpreter object use under OWN_GIL. +- **Callback suspend/resume lifetime hardening** - The worker resource is now kept + alive for the lifetime of a suspended callback (it could previously be GC'd mid- + suspension, causing a use-after-free on resume). A resume frees any prior result + before storing a new one (no leak/double-replay on a duplicate resume), the + pending-callback thread-local is cleared at the worker request boundary, and the + callback-response pipe writes run on dirty schedulers with non-blocking, deadline- + bounded writes so a stalled reader or large payload can't wedge a scheduler or + desync the framed protocol. +- **Zero-copy buffer pinning** - `py_buffer` no longer relocates (and frees) its + storage while a Python `memoryview` points into it. A write that would grow the + buffer while a view is held now returns an error instead of dangling the view into + freed memory (a use-after-free that crashed the whole node). +- **Bounded recursion in type conversion** - The Erlang<->Python converters now cap + nesting depth, so a deeply nested term (or Python structure) returns a clean error + instead of overflowing the C stack and crashing the whole node. +- **NULL-checked tuple allocation** - Argument-tuple allocations in the call/eval paths + are checked before use, and the Python->Erlang map conversion is bounded against + mid-iteration dict mutation, closing two ways an allocation failure or re-entrant + `__str__` could corrupt memory. +- **Safe term decoding at the NIF boundary** - All `enif_binary_to_term` calls now + pass `ERL_NIF_BIN2TERM_SAFE`, preventing attacker-influenced data (notably a Python + `"__etf__:"` callback result) from minting new, non-GC'd atoms and exhausting + the atom table. Local-node pids/refs and already-existing atoms still round-trip + unchanged; only brand-new atoms, remote-node pids/refs, and external funs in + Python-supplied payloads are now rejected. + ### Changed - **Support Erlang/OTP 28 and 29** - Validated builds and the full Common Test diff --git a/c_src/py_buffer.c b/c_src/py_buffer.c index cb55935..f21f25e 100644 --- a/c_src/py_buffer.c +++ b/c_src/py_buffer.c @@ -152,6 +152,14 @@ int py_buffer_write(py_buffer_resource_t *buf, const unsigned char *data, size_t /* Check if we need to grow the buffer */ size_t required = buf->write_pos + size; if (required > buf->capacity) { + /* A live Python memoryview (from PyBuffer_getbuffer) holds a raw pointer + * into buf->data. Relocating/freeing the buffer now would leave that + * pointer dangling -> use-after-free that crashes the whole node. Refuse + * to grow while any view is pinned; the caller gets a write error. */ + if (buf->view_count > 0) { + pthread_mutex_unlock(&buf->mutex); + return -1; + } /* Calculate new capacity */ size_t new_capacity = buf->capacity; while (new_capacity < required) { diff --git a/c_src/py_callback.c b/c_src/py_callback.c index a8d0589..4aa87df 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -147,6 +147,33 @@ static PyObject *get_current_process_error(void) { return exc_class; } +/** + * Get the SuspensionRequired exception class from the current interpreter's + * erlang module. Under OWN_GIL subinterpreters each interpreter has its own + * erlang module/class, so raising the process-global object (which belongs to + * whichever interpreter initialized last) is cross-interpreter UB. Mirrors + * get_current_process_error(). + */ +static PyObject *get_current_suspension_required(void) { + PyObject *erlang_module = PyImport_ImportModule("erlang"); + if (erlang_module == NULL) { + PyErr_Clear(); + return SuspensionRequiredException; /* Fallback to global */ + } + + PyObject *exc_class = PyObject_GetAttrString(erlang_module, "SuspensionRequired"); + Py_DECREF(erlang_module); + + if (exc_class == NULL) { + PyErr_Clear(); + return SuspensionRequiredException; /* Fallback to global */ + } + + /* See get_current_process_error: decref and rely on the module keeping it alive. */ + Py_DECREF(exc_class); + return exc_class; +} + /* ============================================================================ * Callback Name Registry * @@ -399,6 +426,14 @@ static suspended_state_t *create_suspended_state_ex( } else { state->worker = source->data.existing->worker; } + /* Keep the worker resource alive for as long as the suspended state exists. + * Without this the worker can be GC'd while a callback is suspended, and + * nif_resume_callback_dirty would dereference a freed worker (use-after-free + * with the GIL held). Mirrors the enif_keep_resource(ctx) on the context path; + * suspended_state_destructor releases it. */ + if (state->worker != NULL) { + enif_keep_resource(state->worker); + } state->callback_id = PyLong_AsUnsignedLongLong(callback_id_obj); @@ -977,7 +1012,7 @@ static PyObject *decode_etf_string(const char *str, Py_ssize_t len) { /* Decode the ETF binary to an Erlang term */ ERL_NIF_TERM term; - if (enif_binary_to_term(tmp_env, (unsigned char *)bin_data, bin_len, &term, 0) == 0) { + if (enif_binary_to_term(tmp_env, (unsigned char *)bin_data, bin_len, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { /* Decoding failed */ enif_free_env(tmp_env); Py_DECREF(decoded); @@ -2109,7 +2144,7 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) { Py_XSETREF(tl_pending_args, call_args); /* Raise exception to abort Python execution */ - PyErr_SetString(SuspensionRequiredException, "callback pending"); + PyErr_SetString(get_current_suspension_required(), "callback pending"); return NULL; } @@ -2859,7 +2894,7 @@ static PyObject *erlang_channel_try_receive_impl(PyObject *self, PyObject *args) } ERL_NIF_TERM term; - if (enif_binary_to_term(tmp_env, data, size, &term, 0) == 0) { + if (enif_binary_to_term(tmp_env, data, size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); enif_free_env(tmp_env); PyErr_SetString(PyExc_RuntimeError, "failed to decode term"); @@ -2939,7 +2974,7 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) { } ERL_NIF_TERM term; - if (enif_binary_to_term(tmp_env, data, size, &term, 0) == 0) { + if (enif_binary_to_term(tmp_env, data, size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); enif_free_env(tmp_env); PyErr_SetString(PyExc_RuntimeError, "failed to decode term"); @@ -3251,7 +3286,7 @@ static PyObject *erlang_channel_wait_impl(PyObject *self, PyObject *args) { } ERL_NIF_TERM term; - if (enif_binary_to_term(tmp_env, data, msg_size, &term, 0) == 0) { + if (enif_binary_to_term(tmp_env, data, msg_size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); enif_free_env(tmp_env); PyErr_SetString(PyExc_RuntimeError, "failed to decode term"); @@ -4316,7 +4351,14 @@ static ERL_NIF_TERM nif_resume_callback(ErlNifEnv *env, int argc, const ERL_NIF_ /* Store the result in the suspended state */ pthread_mutex_lock(&state->mutex); - /* Copy result data */ + /* Copy result data. Free any prior result first: a duplicate/raced resume + * would otherwise leak the previous buffer. (has_result is not a one-shot + * flag -- it toggles during nested replay -- so result_data is the real + * pending-result indicator.) */ + if (state->result_data != NULL) { + enif_free(state->result_data); + state->result_data = NULL; + } state->result_data = enif_alloc(result_bin.size); if (state->result_data == NULL) { pthread_mutex_unlock(&state->mutex); @@ -4364,6 +4406,12 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER return make_error(env, "no_result"); } + /* The worker is kept alive for the lifetime of the suspended state, but + * guard rather than dereference NULL in the replay below. */ + if (state->worker == NULL) { + return make_error(env, "no_worker"); + } + /* Set up thread-local state for replay */ tl_current_worker = state->worker; tl_callback_env = env; @@ -4430,6 +4478,11 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + result = make_error(env, "alloc_failed"); + goto call_cleanup; + } ERL_NIF_TERM head, tail = state->orig_args; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(state->orig_env, tail, &head, &tail); diff --git a/c_src/py_channel.c b/c_src/py_channel.c index 9c38604..ecfba9f 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -555,7 +555,7 @@ static ERL_NIF_TERM nif_channel_receive(ErlNifEnv *env, int argc, const ERL_NIF_ if (result == 0) { /* Data available - convert back to term */ ERL_NIF_TERM term; - if (enif_binary_to_term(env, data, size, &term, 0) == 0) { + if (enif_binary_to_term(env, data, size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); return make_error(env, "binary_to_term_failed"); } @@ -590,7 +590,7 @@ static ERL_NIF_TERM nif_channel_try_receive(ErlNifEnv *env, int argc, const ERL_ if (result == 0) { /* Data available - convert back to term */ ERL_NIF_TERM term; - if (enif_binary_to_term(env, data, size, &term, 0) == 0) { + if (enif_binary_to_term(env, data, size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); return make_error(env, "binary_to_term_failed"); } @@ -760,7 +760,7 @@ ERL_NIF_TERM nif_channel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[ /* Convert back to term */ ERL_NIF_TERM term; - if (enif_binary_to_term(env, data, msg_size, &term, 0) == 0) { + if (enif_binary_to_term(env, data, msg_size, &term, ERL_NIF_BIN2TERM_SAFE) == 0) { enif_free(data); return make_error(env, "binary_to_term_failed"); } diff --git a/c_src/py_convert.c b/c_src/py_convert.c index 4961aa4..cbd9bab 100644 --- a/c_src/py_convert.c +++ b/c_src/py_convert.c @@ -55,6 +55,15 @@ /* Capsule name for shared dict references */ #define SHARED_DICT_CAPSULE_NAME "py_shared_dict" +/* Maximum nesting depth for recursive term<->Python conversion. Guards the C + * stack against unbounded recursion on deeply nested (possibly attacker-supplied) + * data, which would otherwise overflow the dirty-scheduler stack and crash the + * whole node. Far deeper than any realistic data structure. */ +#define PY_CONVERT_MAX_DEPTH 200 + +static ERL_NIF_TERM py_to_term_d(ErlNifEnv *env, PyObject *obj, int depth); +static PyObject *term_to_py_d(ErlNifEnv *env, ERL_NIF_TERM term, int depth); + /** * @brief PyCapsule destructor for channel references * @@ -164,6 +173,13 @@ static inline bool is_numpy_ndarray(PyObject *obj) { * @see term_to_py() for the reverse conversion */ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { + return py_to_term_d(env, obj, 0); +} + +static ERL_NIF_TERM py_to_term_d(ErlNifEnv *env, PyObject *obj, int depth) { + if (depth > PY_CONVERT_MAX_DEPTH) { + return ATOM_ERROR; + } /* * Type check ordering optimized for common workloads: * 1. Strings (most common in HTTP headers, bodies, JSON) @@ -218,15 +234,19 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { PyObject *key, *value; Py_ssize_t pos = 0; Py_ssize_t i = 0; - while (PyDict_Next(obj, &pos, &key, &value)) { - keys[i] = py_to_term(env, key); - values[i] = py_to_term(env, value); + while (i < size && PyDict_Next(obj, &pos, &key, &value)) { + keys[i] = py_to_term_d(env, key, depth + 1); + values[i] = py_to_term_d(env, value, depth + 1); i++; } - /* Build map from arrays - more efficient than iterative puts */ + /* Build map from arrays - more efficient than iterative puts. + * Use the actual filled count `i`, not the sampled `size`: converting a + * key/value can run user code (__str__/.tolist()) that mutates the dict, + * so the real pair count may differ. The loop is bounded by `size` so we + * never write past the arrays. */ ERL_NIF_TERM map; - int result = enif_make_map_from_arrays(env, keys, values, size, &map); + int result = enif_make_map_from_arrays(env, keys, values, i, &map); if (keys != stack_keys) enif_free(keys); if (values != stack_values) enif_free(values); @@ -287,7 +307,7 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { ERL_NIF_TERM list = enif_make_list(env, 0); /* Start with empty list */ for (Py_ssize_t i = len - 1; i >= 0; i--) { PyObject *item = PyList_GetItem(obj, i); /* Borrowed ref */ - ERL_NIF_TERM term = py_to_term(env, item); + ERL_NIF_TERM term = py_to_term_d(env, item, depth + 1); list = enif_make_list_cell(env, term, list); } return list; @@ -344,7 +364,7 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { for (Py_ssize_t i = 0; i < len; i++) { PyObject *item = PyTuple_GetItem(obj, i); /* Borrowed ref */ - items[i] = py_to_term(env, item); + items[i] = py_to_term_d(env, item, depth + 1); } ERL_NIF_TERM result = enif_make_tuple_from_array(env, items, len); @@ -365,7 +385,7 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { if (Py_IS_TYPE(obj, &ErlangRefType)) { ErlangRefObject *ref_obj = (ErlangRefObject *)obj; ERL_NIF_TERM result; - if (enif_binary_to_term(env, ref_obj->data, ref_obj->size, &result, 0) > 0) { + if (enif_binary_to_term(env, ref_obj->data, ref_obj->size, &result, ERL_NIF_BIN2TERM_SAFE) > 0) { return result; } /* Failed to deserialize - return undefined */ @@ -388,7 +408,7 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { if (is_numpy_ndarray(obj)) { PyObject *tolist = PyObject_CallMethod(obj, "tolist", NULL); if (tolist != NULL) { - ERL_NIF_TERM result = py_to_term(env, tolist); + ERL_NIF_TERM result = py_to_term_d(env, tolist, depth + 1); Py_DECREF(tolist); return result; } @@ -487,6 +507,15 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { * @see py_to_term() for the reverse conversion */ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { + return term_to_py_d(env, term, 0); +} + +static PyObject *term_to_py_d(ErlNifEnv *env, ERL_NIF_TERM term, int depth) { + if (depth > PY_CONVERT_MAX_DEPTH) { + PyErr_SetString(PyExc_RecursionError, "Erlang term nesting too deep"); + return NULL; + } + double d_val; ErlNifBinary bin; unsigned int list_len; @@ -581,7 +610,7 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { ERL_NIF_TERM head, tail = term; for (unsigned int i = 0; i < list_len; i++) { enif_get_list_cell(env, tail, &head, &tail); - PyObject *item = term_to_py(env, head); + PyObject *item = term_to_py_d(env, head, depth + 1); if (item == NULL) { Py_DECREF(list); return NULL; @@ -598,7 +627,7 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { return NULL; } for (int i = 0; i < arity; i++) { - PyObject *item = term_to_py(env, tuple[i]); + PyObject *item = term_to_py_d(env, tuple[i], depth + 1); if (item == NULL) { Py_DECREF(py_tuple); return NULL; @@ -619,8 +648,8 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { enif_map_iterator_create(env, term, &iter, ERL_NIF_MAP_ITERATOR_FIRST); while (enif_map_iterator_get_pair(env, &iter, &key, &value)) { - PyObject *py_key = term_to_py(env, key); - PyObject *py_value = term_to_py(env, value); + PyObject *py_key = term_to_py_d(env, key, depth + 1); + PyObject *py_value = term_to_py_d(env, value, depth + 1); if (py_key == NULL || py_value == NULL) { Py_XDECREF(py_key); Py_XDECREF(py_value); @@ -796,13 +825,22 @@ static ERL_NIF_TERM make_py_error(ErlNifEnv *env) { enif_make_tuple2(env, ATOM_STOP_ITERATION, ATOM_NONE)); } - /* Get exception message */ + /* Get exception message. PyUnicode_AsUTF8 can return NULL (e.g. a str with + * lone surrogates); never pass NULL to enif_make_string. */ PyObject *str = PyObject_Str(value); - const char *err_msg = str ? PyUnicode_AsUTF8(str) : "unknown"; + const char *err_msg = (str != NULL) ? PyUnicode_AsUTF8(str) : NULL; + if (err_msg == NULL) { + PyErr_Clear(); + err_msg = "unknown"; + } - /* Get exception type name */ - PyObject *type_name = PyObject_GetAttrString(type, "__name__"); - const char *type_str = type_name ? PyUnicode_AsUTF8(type_name) : "Exception"; + /* Get exception type name (type itself may be NULL after PyErr_Fetch). */ + PyObject *type_name = (type != NULL) ? PyObject_GetAttrString(type, "__name__") : NULL; + const char *type_str = (type_name != NULL) ? PyUnicode_AsUTF8(type_name) : NULL; + if (type_str == NULL) { + PyErr_Clear(); + type_str = "Exception"; + } /* Build error tuple: {error, {TypeAtom, MessageString}} */ ERL_NIF_TERM error_tuple = enif_make_tuple2(env, diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index bbb796a..5ed6573 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -2101,6 +2101,11 @@ ERL_NIF_TERM nif_event_loop_run_async(ErlNifEnv *env, int argc, } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = argv[5]; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -2838,7 +2843,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, ERL_NIF_TERM task_term; if (enif_binary_to_term(term_env, task_bin.data, task_bin.size, - &task_term, 0) == 0) { + &task_term, ERL_NIF_BIN2TERM_SAFE) == 0) { return_pooled_env(loop, term_env); /* Dequeue and skip this malformed task */ enif_ioq_deq(loop->task_queue, iov[0].iov_len, NULL); @@ -3109,6 +3114,11 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + return_pooled_env(loop, term_env); + continue; + } ERL_NIF_TERM head, tail = tuple_elems[4]; bool args_ok = true; for (unsigned int i = 0; i < args_len && args_ok; i++) { @@ -6506,6 +6516,83 @@ static PyObject *py_get_isolation_mode(PyObject *self, PyObject *args) { return PyUnicode_FromString("global"); } +/* ============================================================================ + * Validating fd-handle registry + * + * Python receives an opaque integer id for each fd resource, never a raw + * pointer. Every consumer validates the id against this table, so a stale, + * duplicate, or fabricated id is a safe no-op instead of a use-after-free or + * arbitrary-pointer dereference. The producer's resource reference lives in the + * table entry: fd_reg_get hands out a temporary keep, fd_reg_take transfers the + * reference to the caller to release. + * ============================================================================ */ +typedef struct { uint64_t id; fd_resource_t *res; } fd_reg_entry_t; +static fd_reg_entry_t *g_fd_reg = NULL; +static size_t g_fd_reg_len = 0; +static size_t g_fd_reg_cap = 0; +static uint64_t g_fd_reg_next_id = 1; +static pthread_mutex_t g_fd_reg_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* Store res and return its opaque id (0 on allocation failure). The caller's + * existing resource reference becomes owned by the table entry. */ +static uint64_t fd_reg_add(fd_resource_t *res) { + pthread_mutex_lock(&g_fd_reg_mutex); + if (g_fd_reg_len == g_fd_reg_cap) { + size_t ncap = g_fd_reg_cap ? g_fd_reg_cap * 2 : 16; + fd_reg_entry_t *n = enif_realloc(g_fd_reg, ncap * sizeof(*n)); + if (n == NULL) { + pthread_mutex_unlock(&g_fd_reg_mutex); + return 0; + } + g_fd_reg = n; + g_fd_reg_cap = ncap; + } + uint64_t id = g_fd_reg_next_id++; + if (id == 0) id = g_fd_reg_next_id++; /* never hand out 0 */ + g_fd_reg[g_fd_reg_len].id = id; + g_fd_reg[g_fd_reg_len].res = res; + g_fd_reg_len++; + pthread_mutex_unlock(&g_fd_reg_mutex); + return id; +} + +/* Look up id; on hit, keep the resource and return it (caller must release). + * Returns NULL for an unknown/stale/fabricated id. */ +static fd_resource_t *fd_reg_get(uint64_t id) { + fd_resource_t *res = NULL; + if (id != 0) { + pthread_mutex_lock(&g_fd_reg_mutex); + for (size_t i = 0; i < g_fd_reg_len; i++) { + if (g_fd_reg[i].id == id) { + res = g_fd_reg[i].res; + enif_keep_resource(res); + break; + } + } + pthread_mutex_unlock(&g_fd_reg_mutex); + } + return res; +} + +/* Remove id; on hit, return the resource (caller owns the entry's reference and + * must release it). Returns NULL for an unknown/stale/duplicate id. */ +static fd_resource_t *fd_reg_take(uint64_t id) { + fd_resource_t *res = NULL; + if (id != 0) { + pthread_mutex_lock(&g_fd_reg_mutex); + for (size_t i = 0; i < g_fd_reg_len; i++) { + if (g_fd_reg[i].id == id) { + res = g_fd_reg[i].res; + g_fd_reg[i] = g_fd_reg[g_fd_reg_len - 1]; /* swap with last */ + g_fd_reg_len--; + break; + } + } + pthread_mutex_unlock(&g_fd_reg_mutex); + } + return res; +} + /* Python function: _add_reader(fd, callback_id) -> fd_key */ static PyObject *py_add_reader(PyObject *self, PyObject *args) { (void)self; @@ -6557,8 +6644,17 @@ static PyObject *py_add_reader(PyObject *self, PyObject *args) { } /* Return a key that can be used to remove the reader */ - unsigned long long key = (unsigned long long)(uintptr_t)fd_res; - return PyLong_FromUnsignedLongLong(key); + /* Hand Python an opaque, validated id instead of a raw pointer. The + * producer's resource reference is taken over by the registry entry. */ + uint64_t id = fd_reg_add(fd_res); + if (id == 0) { + enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, + fd_res, NULL, ATOM_UNDEFINED); + enif_release_resource(fd_res); + PyErr_SetString(PyExc_MemoryError, "fd registry full"); + return NULL; + } + return PyLong_FromUnsignedLongLong((unsigned long long)id); } /* Python function: _remove_reader(fd_key) -> None */ @@ -6571,11 +6667,13 @@ static PyObject *py_remove_reader(PyObject *self, PyObject *args) { } /* Use per-interpreter event loop lookup - but still allow cleanup even if loop is gone */ - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; - if (fd_res != NULL && fd_res->loop != NULL) { - enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, - ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); - fd_res->reader_active = false; + fd_resource_t *fd_res = fd_reg_take(fd_key); + if (fd_res != NULL) { + if (fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->reader_active = false; + } enif_release_resource(fd_res); } @@ -6633,8 +6731,17 @@ static PyObject *py_add_writer(PyObject *self, PyObject *args) { } /* Return a key that can be used to remove the writer */ - unsigned long long key = (unsigned long long)(uintptr_t)fd_res; - return PyLong_FromUnsignedLongLong(key); + /* Hand Python an opaque, validated id instead of a raw pointer. The + * producer's resource reference is taken over by the registry entry. */ + uint64_t id = fd_reg_add(fd_res); + if (id == 0) { + enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, + fd_res, NULL, ATOM_UNDEFINED); + enif_release_resource(fd_res); + PyErr_SetString(PyExc_MemoryError, "fd registry full"); + return NULL; + } + return PyLong_FromUnsignedLongLong((unsigned long long)id); } /* Python function: _remove_writer(fd_key) -> None */ @@ -6647,11 +6754,13 @@ static PyObject *py_remove_writer(PyObject *self, PyObject *args) { } /* Use fd_res->loop directly - allows cleanup even if interpreter's loop is gone */ - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; - if (fd_res != NULL && fd_res->loop != NULL) { - enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, - ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); - fd_res->writer_active = false; + fd_resource_t *fd_res = fd_reg_take(fd_key); + if (fd_res != NULL) { + if (fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->writer_active = false; + } enif_release_resource(fd_res); } @@ -7381,8 +7490,17 @@ static PyObject *py_add_reader_for(PyObject *self, PyObject *args) { return NULL; } - unsigned long long key = (unsigned long long)(uintptr_t)fd_res; - return PyLong_FromUnsignedLongLong(key); + /* Hand Python an opaque, validated id instead of a raw pointer. The + * producer's resource reference is taken over by the registry entry. */ + uint64_t id = fd_reg_add(fd_res); + if (id == 0) { + enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, + fd_res, NULL, ATOM_UNDEFINED); + enif_release_resource(fd_res); + PyErr_SetString(PyExc_MemoryError, "fd registry full"); + return NULL; + } + return PyLong_FromUnsignedLongLong((unsigned long long)id); } /* Python function: _remove_reader_for(capsule, fd_key) -> None */ @@ -7401,11 +7519,13 @@ static PyObject *py_remove_reader_for(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; - if (fd_res != NULL && fd_res->loop != NULL) { - enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, - ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); - fd_res->reader_active = false; + fd_resource_t *fd_res = fd_reg_take(fd_key); + if (fd_res != NULL) { + if (fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->reader_active = false; + } enif_release_resource(fd_res); } @@ -7462,8 +7582,17 @@ static PyObject *py_add_writer_for(PyObject *self, PyObject *args) { return NULL; } - unsigned long long key = (unsigned long long)(uintptr_t)fd_res; - return PyLong_FromUnsignedLongLong(key); + /* Hand Python an opaque, validated id instead of a raw pointer. The + * producer's resource reference is taken over by the registry entry. */ + uint64_t id = fd_reg_add(fd_res); + if (id == 0) { + enif_select(loop->msg_env, (ErlNifEvent)fd, ERL_NIF_SELECT_STOP, + fd_res, NULL, ATOM_UNDEFINED); + enif_release_resource(fd_res); + PyErr_SetString(PyExc_MemoryError, "fd registry full"); + return NULL; + } + return PyLong_FromUnsignedLongLong((unsigned long long)id); } /* Python function: _remove_writer_for(capsule, fd_key) -> None */ @@ -7482,11 +7611,13 @@ static PyObject *py_remove_writer_for(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; - if (fd_res != NULL && fd_res->loop != NULL) { - enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, - ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); - fd_res->writer_active = false; + fd_resource_t *fd_res = fd_reg_take(fd_key); + if (fd_res != NULL) { + if (fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->writer_active = false; + } enif_release_resource(fd_res); } @@ -7506,8 +7637,9 @@ static PyObject *py_update_fd_read(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + fd_resource_t *fd_res = fd_reg_get(fd_key); if (fd_res == NULL || fd_res->loop == NULL) { + if (fd_res) enif_release_resource(fd_res); PyErr_SetString(PyExc_ValueError, "Invalid fd resource"); return NULL; } @@ -7521,6 +7653,7 @@ static PyObject *py_update_fd_read(PyObject *self, PyObject *args) { enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_READ, fd_res, target_pid, ATOM_UNDEFINED); + enif_release_resource(fd_res); Py_RETURN_NONE; } @@ -7537,8 +7670,9 @@ static PyObject *py_update_fd_write(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + fd_resource_t *fd_res = fd_reg_get(fd_key); if (fd_res == NULL || fd_res->loop == NULL) { + if (fd_res) enif_release_resource(fd_res); PyErr_SetString(PyExc_ValueError, "Invalid fd resource"); return NULL; } @@ -7552,6 +7686,7 @@ static PyObject *py_update_fd_write(PyObject *self, PyObject *args) { enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_WRITE, fd_res, target_pid, ATOM_UNDEFINED); + enif_release_resource(fd_res); Py_RETURN_NONE; } @@ -7567,9 +7702,10 @@ static PyObject *py_clear_fd_read(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + fd_resource_t *fd_res = fd_reg_get(fd_key); if (fd_res == NULL || fd_res->loop == NULL) { - Py_RETURN_NONE; /* Already cleaned up */ + if (fd_res) enif_release_resource(fd_res); + Py_RETURN_NONE; /* Already cleaned up / invalid id */ } if (fd_res->reader_active) { @@ -7580,6 +7716,7 @@ static PyObject *py_clear_fd_read(PyObject *self, PyObject *args) { fd_res->read_callback_id = 0; } + enif_release_resource(fd_res); Py_RETURN_NONE; } @@ -7595,9 +7732,10 @@ static PyObject *py_clear_fd_write(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + fd_resource_t *fd_res = fd_reg_get(fd_key); if (fd_res == NULL || fd_res->loop == NULL) { - Py_RETURN_NONE; /* Already cleaned up */ + if (fd_res) enif_release_resource(fd_res); + Py_RETURN_NONE; /* Already cleaned up / invalid id */ } if (fd_res->writer_active) { @@ -7608,6 +7746,7 @@ static PyObject *py_clear_fd_write(PyObject *self, PyObject *args) { fd_res->write_callback_id = 0; } + enif_release_resource(fd_res); Py_RETURN_NONE; } @@ -7623,10 +7762,12 @@ static PyObject *py_release_fd_resource(PyObject *self, PyObject *args) { return NULL; } - fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; - if (fd_res != NULL && fd_res->loop != NULL) { - enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, - ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_resource_t *fd_res = fd_reg_take(fd_key); + if (fd_res != NULL) { + if (fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + } enif_release_resource(fd_res); } diff --git a/c_src/py_exec.c b/c_src/py_exec.c index cd8a32b..8672664 100644 --- a/c_src/py_exec.c +++ b/c_src/py_exec.c @@ -248,6 +248,11 @@ static void process_request(py_request_t *req) { } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + req->result = make_error(env, "alloc_failed"); + goto call_cleanup; + } ERL_NIF_TERM head, tail = req->args_term; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -299,11 +304,14 @@ static void process_request(py_request_t *req) { suspended_state_t *suspended = create_suspended_state(env, exc_args, req); Py_DECREF(exc_args); if (suspended == NULL) { - tl_pending_callback = false; - Py_CLEAR(tl_pending_args); + clear_pending_callback_tls(); req->result = make_error(env, "create_suspended_state_failed"); } else { req->result = build_suspended_result(env, suspended); + /* func_name/args are copied into the suspended state; clear + * the pending-callback TLS so a later request on this reused + * worker thread doesn't trip the stale-TLS entry invariant. */ + clear_pending_callback_tls(); } } } else { @@ -403,11 +411,11 @@ static void process_request(py_request_t *req) { suspended_state_t *suspended = create_suspended_state(env, exc_args, req); Py_DECREF(exc_args); if (suspended == NULL) { - tl_pending_callback = false; - Py_CLEAR(tl_pending_args); + clear_pending_callback_tls(); req->result = make_error(env, "create_suspended_state_failed"); } else { req->result = build_suspended_result(env, suspended); + clear_pending_callback_tls(); } } } else { diff --git a/c_src/py_nif.c b/c_src/py_nif.c index a2e638c..0b33bca 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -513,11 +513,18 @@ static void suspended_state_destructor(ErlNifEnv *env, void *obj) { (void)env; suspended_state_t *state = (suspended_state_t *)obj; + /* Release the worker resource kept alive in create_suspended_state_ex. */ + if (state->worker != NULL) { + enif_release_resource(state->worker); + state->worker = NULL; + } + /* Clean up Python objects if Python is still initialized. * suspended_state_t is used with the worker-based API which runs in * the main interpreter, so we always use PyGILState_Ensure. */ if (runtime_is_running() && state->callback_args != NULL) { if (PyGILState_GetThisThreadState() != NULL || PyGILState_Check()) { + Py_XDECREF(state->callback_args); state->callback_args = NULL; } else { PyGILState_STATE gstate = PyGILState_Ensure(); @@ -1700,6 +1707,11 @@ static ERL_NIF_TERM nif_set_callback_handler(ErlNifEnv *env, int argc, const ERL if (pipe(worker->callback_pipe) < 0) { return make_error(env, "pipe_failed"); } + /* Non-blocking write end so write_all_with_deadline can bound the write. */ + { + int wfl = fcntl(worker->callback_pipe[1], F_GETFL, 0); + if (wfl >= 0) (void)fcntl(worker->callback_pipe[1], F_SETFL, wfl | O_NONBLOCK); + } worker->has_callback_handler = true; @@ -1708,6 +1720,14 @@ static ERL_NIF_TERM nif_set_callback_handler(ErlNifEnv *env, int argc, const ERL enif_make_int(env, worker->callback_pipe[1])); } +/* Bound for callback-response pipe writes: a stalled reader must not block a + * dirty scheduler forever (the pipe write ends are set non-blocking). */ +#define CALLBACK_RESPONSE_IO_TIMEOUT_MS 30000 + +/* Bound for OWN_GIL dispatch pipe I/O so a stalled/dead worker thread can't + * block the dispatching dirty scheduler forever. */ +#define OWNGIL_IO_TIMEOUT_MS 30000 + static ERL_NIF_TERM nif_send_callback_response(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { (void)argc; int fd; @@ -1721,15 +1741,16 @@ static ERL_NIF_TERM nif_send_callback_response(ErlNifEnv *env, int argc, const E return make_error(env, "invalid_response"); } - /* Write length then data */ + /* Write length then data with a timed, non-blocking writer (the pipe write + * end is O_NONBLOCK) so a stalled reader or a large payload can't block a + * dirty scheduler forever or desync the length-framed protocol on EINTR. */ uint32_t len = (uint32_t)response.size; - ssize_t n = write(fd, &len, sizeof(len)); - if (n != sizeof(len)) { + if (write_all_with_deadline(fd, &len, sizeof(len), + CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) { return make_error(env, "write_length_failed"); } - - n = write(fd, response.data, response.size); - if (n != (ssize_t)response.size) { + if (write_all_with_deadline(fd, response.data, response.size, + CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) { return make_error(env, "write_data_failed"); } @@ -2030,6 +2051,14 @@ static void owngil_execute_call(py_context_t *ctx) { } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "arg_conversion_failed")); + ctx->response_ok = false; + return; + } ERL_NIF_TERM head, tail = args_term; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(ctx->shared_env, tail, &head, &tail); @@ -2729,6 +2758,15 @@ static void owngil_execute_call_with_env(py_context_t *ctx) { } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + tl_current_local_env = prev_local_env; + ctx->response_term = enif_make_tuple2(ctx->shared_env, + enif_make_atom(ctx->shared_env, "error"), + enif_make_atom(ctx->shared_env, "arg_conversion_failed")); + ctx->response_ok = false; + return; + } ERL_NIF_TERM head, tail = args_term; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(ctx->shared_env, tail, &head, &tail); @@ -4685,6 +4723,11 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T enif_release_resource(ctx); return make_error(env, "pipe_create_failed"); } + /* Non-blocking write end so write_all_with_deadline can bound the write. */ + { + int wfl = fcntl(ctx->callback_pipe[1], F_GETFL, 0); + if (wfl >= 0) (void)fcntl(ctx->callback_pipe[1], F_SETFL, wfl | O_NONBLOCK); + } #ifdef HAVE_SUBINTERPRETERS ctx->uses_own_gil = false; @@ -4975,6 +5018,11 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = argv[3]; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -6244,6 +6292,11 @@ static ERL_NIF_TERM nif_context_call_with_env(ErlNifEnv *env, int argc, const ER } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = argv[3]; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -6402,6 +6455,11 @@ static ERL_NIF_TERM nif_context_call_method(ErlNifEnv *env, int argc, const ERL_ } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(method); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = argv[3]; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -6547,15 +6605,17 @@ static ERL_NIF_TERM nif_context_write_callback_response(ErlNifEnv *env, int argc return make_error(env, "pipe_not_initialized"); } - /* Write length prefix (4 bytes, native endianness - must match read_length_prefixed_data) */ + /* Write length prefix + data with a timed, non-blocking writer (the pipe + * write end is O_NONBLOCK) so a stalled reader or large payload can't block a + * dirty scheduler forever or desync the framed protocol. 4-byte native-endian + * length must match read_length_prefixed_data. */ uint32_t len = (uint32_t)data.size; - ssize_t written = write(ctx->callback_pipe[1], &len, sizeof(len)); - if (written != sizeof(len)) { + if (write_all_with_deadline(ctx->callback_pipe[1], &len, sizeof(len), + CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) { return make_error(env, "write_failed"); } - - written = write(ctx->callback_pipe[1], data.data, data.size); - if (written != (ssize_t)data.size) { + if (write_all_with_deadline(ctx->callback_pipe[1], data.data, data.size, + CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) { return make_error(env, "write_failed"); } @@ -6607,7 +6667,13 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T return make_error(env, "context_mismatch"); } - /* Store the callback result */ + /* Store the callback result. Free any prior result first to avoid leaking it + * on a duplicate/raced resume (result_data, not the toggling has_result flag, + * is the real pending-result indicator). */ + if (state->result_data != NULL) { + enif_free(state->result_data); + state->result_data = NULL; + } state->result_data = enif_alloc(result_bin.size); if (state->result_data == NULL) { return make_error(env, "alloc_failed"); @@ -6686,6 +6752,13 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(func); + enif_free(module_name); + enif_free(func_name); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = state->orig_args; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(state->orig_env, tail, &head, &tail); @@ -7060,6 +7133,11 @@ static ERL_NIF_TERM nif_ref_call_method(ErlNifEnv *env, int argc, const ERL_NIF_ } PyObject *args = PyTuple_New(args_len); + if (args == NULL) { + Py_DECREF(method); + result = make_error(env, "alloc_failed"); + goto cleanup; + } ERL_NIF_TERM head, tail = argv[2]; for (unsigned int i = 0; i < args_len; i++) { enif_get_list_cell(env, tail, &head, &tail); @@ -7218,16 +7296,19 @@ static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc, .payload_len = 0, }; - /* Write header */ - if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header)) { + /* Write header (non-blocking write end + deadline so a stalled/dead worker + * can't block this dirty scheduler forever). */ + if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header), + OWNGIL_IO_TIMEOUT_MS) != WRITE_OK) { pthread_mutex_unlock(&w->dispatch_mutex); return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "write_failed")); } - /* Wait for response */ + /* Wait for response, bounded by a deadline. */ owngil_header_t resp; - if (read(w->result_pipe[0], &resp, sizeof(resp)) != sizeof(resp)) { + if (read_with_timeout(w->result_pipe[0], &resp, sizeof(resp), + OWNGIL_IO_TIMEOUT_MS) != (ssize_t)sizeof(resp)) { pthread_mutex_unlock(&w->dispatch_mutex); return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "read_failed")); @@ -7311,9 +7392,11 @@ static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc, .payload_len = payload_bin.size, }; - /* Write header and payload */ - if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header) || - write(w->cmd_pipe[1], payload_bin.data, payload_bin.size) != (ssize_t)payload_bin.size) { + /* Write header and payload (non-blocking write end + deadline). */ + if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header), + OWNGIL_IO_TIMEOUT_MS) != WRITE_OK || + write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size, + OWNGIL_IO_TIMEOUT_MS) != WRITE_OK) { pthread_mutex_unlock(&w->dispatch_mutex); enif_release_binary(&payload_bin); return enif_make_tuple2(env, ATOM_ERROR, @@ -7369,11 +7452,13 @@ static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc, .payload_len = 0, }; - /* Write header */ - if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) { - /* Wait for response */ + /* Write header (best-effort, bounded). */ + if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header), + OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) { + /* Wait for response (best-effort, bounded). */ owngil_header_t resp; - read(w->result_pipe[0], &resp, sizeof(resp)); + (void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp), + OWNGIL_IO_TIMEOUT_MS); } pthread_mutex_unlock(&w->dispatch_mutex); @@ -7430,12 +7515,15 @@ static ERL_NIF_TERM nif_owngil_apply_imports(ErlNifEnv *env, int argc, .payload_len = payload_bin.size, }; - /* Write header and payload */ - if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) { - write(w->cmd_pipe[1], payload_bin.data, payload_bin.size); - /* Wait for response */ + /* Write header and payload (best-effort, bounded). */ + if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header), + OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) { + (void)write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size, + OWNGIL_IO_TIMEOUT_MS); + /* Wait for response (best-effort, bounded). */ owngil_header_t resp; - read(w->result_pipe[0], &resp, sizeof(resp)); + (void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp), + OWNGIL_IO_TIMEOUT_MS); } enif_release_binary(&payload_bin); @@ -7493,12 +7581,15 @@ static ERL_NIF_TERM nif_owngil_apply_paths(ErlNifEnv *env, int argc, .payload_len = payload_bin.size, }; - /* Write header and payload */ - if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) { - write(w->cmd_pipe[1], payload_bin.data, payload_bin.size); - /* Wait for response */ + /* Write header and payload (best-effort, bounded). */ + if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header), + OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) { + (void)write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size, + OWNGIL_IO_TIMEOUT_MS); + /* Wait for response (best-effort, bounded). */ owngil_header_t resp; - read(w->result_pipe[0], &resp, sizeof(resp)); + (void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp), + OWNGIL_IO_TIMEOUT_MS); } enif_release_binary(&payload_bin); @@ -7768,7 +7859,7 @@ static ErlNifFunc nif_funcs[] = { /* Callback support */ {"set_callback_handler", 2, nif_set_callback_handler, 0}, - {"send_callback_response", 2, nif_send_callback_response, 0}, + {"send_callback_response", 2, nif_send_callback_response, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"resume_callback", 2, nif_resume_callback, 0}, /* Async worker management */ @@ -7792,11 +7883,11 @@ static ErlNifFunc nif_funcs[] = { {"subinterp_thread_pool_stats", 0, nif_subinterp_thread_pool_stats, 0}, /* OWN_GIL session management for event loop pool */ - {"owngil_create_session", 1, nif_owngil_create_session, 0}, - {"owngil_submit_task", 7, nif_owngil_submit_task, 0}, - {"owngil_destroy_session", 2, nif_owngil_destroy_session, 0}, - {"owngil_apply_imports", 3, nif_owngil_apply_imports, 0}, - {"owngil_apply_paths", 3, nif_owngil_apply_paths, 0}, + {"owngil_create_session", 1, nif_owngil_create_session, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"owngil_submit_task", 7, nif_owngil_submit_task, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"owngil_destroy_session", 2, nif_owngil_destroy_session, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"owngil_apply_imports", 3, nif_owngil_apply_imports, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"owngil_apply_paths", 3, nif_owngil_apply_paths, ERL_NIF_DIRTY_JOB_IO_BOUND}, /* Execution mode info */ {"execution_mode", 0, nif_execution_mode, 0}, @@ -7917,7 +8008,7 @@ static ErlNifFunc nif_funcs[] = { {"context_interp_id", 1, nif_context_interp_id, 0}, {"context_set_callback_handler", 2, nif_context_set_callback_handler, 0}, {"context_get_callback_pipe", 1, nif_context_get_callback_pipe, 0}, - {"context_write_callback_response", 2, nif_context_write_callback_response, 0}, + {"context_write_callback_response", 2, nif_context_write_callback_response, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"context_resume", 3, nif_context_resume, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_cancel_resume", 2, nif_context_cancel_resume, 0}, {"context_get_event_loop", 1, nif_context_get_event_loop, 0}, @@ -7941,8 +8032,8 @@ static ErlNifFunc nif_funcs[] = { {"reactor_close_fd", 2, nif_reactor_close_fd, 0}, /* Direct FD operations */ - {"fd_read", 2, nif_fd_read, 0}, - {"fd_write", 2, nif_fd_write, 0}, + {"fd_read", 2, nif_fd_read, ERL_NIF_DIRTY_JOB_IO_BOUND}, + {"fd_write", 2, nif_fd_write, ERL_NIF_DIRTY_JOB_IO_BOUND}, {"fd_select_read", 1, nif_fd_select_read, 0}, {"fd_select_write", 1, nif_fd_select_write, 0}, {"fd_close", 1, nif_fd_close, 0}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index 066024e..8977b8f 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1674,6 +1674,12 @@ static ERL_NIF_TERM make_py_error(ErlNifEnv *env); * @warning Caller must call enif_free() on the returned string */ static char *binary_to_string(const ErlNifBinary *bin) { + /* Reject embedded NULs: the result is used as a C string, so a NUL would + * silently truncate a module/func/attr/code name (a name-smuggling vector). + * Callers already treat a NULL return as an error. */ + if (bin->size > 0 && memchr(bin->data, '\0', bin->size) != NULL) { + return NULL; + } char *str = enif_alloc(bin->size + 1); if (str != NULL) { memcpy(str, bin->data, bin->size); diff --git a/c_src/py_reactor_buffer.c b/c_src/py_reactor_buffer.c index 9535747..fa3fd63 100644 --- a/c_src/py_reactor_buffer.c +++ b/c_src/py_reactor_buffer.c @@ -620,8 +620,13 @@ static PyObject *ReactorBuffer_split(ReactorBufferObject *self, PyObject *args, return NULL; } - PyObject *result = PyObject_Call( - PyObject_GetAttrString(bytes_obj, "split"), args, kwargs); + PyObject *split_meth = PyObject_GetAttrString(bytes_obj, "split"); + if (split_meth == NULL) { + Py_DECREF(bytes_obj); + return NULL; + } + PyObject *result = PyObject_Call(split_meth, args, kwargs); + Py_DECREF(split_meth); Py_DECREF(bytes_obj); return result; } diff --git a/c_src/py_subinterp_thread.c b/c_src/py_subinterp_thread.c index 72a1552..ec5ba39 100644 --- a/c_src/py_subinterp_thread.c +++ b/c_src/py_subinterp_thread.c @@ -107,6 +107,13 @@ int subinterp_thread_pool_init(int num_workers) { pthread_mutex_destroy(&w->ns_mutex); goto cleanup_workers; } + /* Non-blocking cmd_pipe write end so the dispatch NIFs can bound their + * writes with write_all_with_deadline (no dirty-scheduler stall on a full + * pipe). The worker thread reads cmd_pipe[0] blocking, which is fine. */ + { + int wfl = fcntl(w->cmd_pipe[1], F_GETFL, 0); + if (wfl >= 0) (void)fcntl(w->cmd_pipe[1], F_SETFL, wfl | O_NONBLOCK); + } if (pipe(w->result_pipe) < 0) { fprintf(stderr, "subinterp_thread_pool_init: failed to create result_pipe for worker %d: %s\n", i, strerror(errno)); @@ -430,7 +437,7 @@ static void *worker_thread_main(void *arg) { if (tmp_env != NULL) { ERL_NIF_TERM imports_list; if (enif_binary_to_term(tmp_env, payload, header.payload_len, - &imports_list, 0) != 0) { + &imports_list, ERL_NIF_BIN2TERM_SAFE) != 0) { ERL_NIF_TERM head, tail = imports_list; int arity; const ERL_NIF_TERM *tuple; @@ -482,7 +489,7 @@ static void *worker_thread_main(void *arg) { if (tmp_env != NULL) { ERL_NIF_TERM paths_list; if (enif_binary_to_term(tmp_env, payload, header.payload_len, - &paths_list, 0) != 0) { + &paths_list, ERL_NIF_BIN2TERM_SAFE) != 0) { PyObject *sys_path = PySys_GetObject("path"); if (sys_path != NULL && PyList_Check(sys_path)) { ERL_NIF_TERM head, tail = paths_list; @@ -549,7 +556,7 @@ static void *worker_thread_main(void *arg) { if (tmp_env != NULL && header.payload_len > 0) { if (enif_binary_to_term(tmp_env, payload, header.payload_len, - &payload_term, 0) != 0) { + &payload_term, ERL_NIF_BIN2TERM_SAFE) != 0) { if (enif_get_tuple(tmp_env, payload_term, &arity, &elements)) { /* Execute based on request type */ PyObject *result = NULL; @@ -562,7 +569,12 @@ static void *worker_thread_main(void *arg) { if ((owns_globals && globals == NULL) || (owns_locals && locals == NULL)) { if (owns_globals) Py_XDECREF(globals); if (owns_locals) Py_XDECREF(locals); - break; + /* Per-request dict allocation failure: respond with an + * error and keep serving. This previously `break`ed the + * worker command loop, permanently killing the thread (and + * leaving the GIL held), wedging every session routed to it. */ + resp_header.msg_type = MSG_ERROR; + goto send_response; } switch (header.req_type) { @@ -933,6 +945,7 @@ static void *worker_thread_main(void *arg) { } } + send_response: if (tmp_env) { enif_free_env(tmp_env); } diff --git a/c_src/py_worker_pool.c b/c_src/py_worker_pool.c index e364179..7452c19 100644 --- a/c_src/py_worker_pool.c +++ b/c_src/py_worker_pool.c @@ -219,9 +219,9 @@ static void py_pool_send_response(py_pool_request_t *req, ERL_NIF_TERM result) { * Set to NULL to prevent double-free in py_pool_request_free. */ req->msg_env = NULL; } else { + /* enif_send fails normally when the caller has already died; the + * g_responses_failed counter records it (no stderr spam). */ atomic_fetch_add(&g_responses_failed, 1); - fprintf(stderr, "[DEBUG] enif_send FAILED for req_id=%llu\n", - (unsigned long long)req->request_id); /* On failure, msg_env is still valid and will be freed in request_free */ } } @@ -579,6 +579,9 @@ static void *py_pool_worker_thread(void *arg) { * ============================================================================ */ static int py_pool_init(int num_workers) { + /* Init/shutdown are serialized by the single Erlang gen_server that owns the + * pool, so this check-then-init runs without a concurrent caller and needs no + * extra lock. */ if (g_pool.initialized) { return 0; /* Already initialized */ } diff --git a/src/py.erl b/src/py.erl index 84fa976..31d88ef 100644 --- a/src/py.erl +++ b/src/py.erl @@ -413,8 +413,8 @@ stream(Module, Func, Args, Kwargs) when map_size(Kwargs) == 0 -> stream(Module, Func, Args, Kwargs) -> %% With kwargs - use eval approach Ctx = py_context_router:get_context(), - ModuleBin = ensure_binary(Module), - FuncBin = ensure_binary(Func), + ModuleBin = valid_py_module(ensure_binary(Module)), + FuncBin = valid_py_ident(ensure_binary(Func)), KwargsCode = format_kwargs(Kwargs), ArgsCode = format_args(Args), Code = iolist_to_binary([ @@ -445,8 +445,8 @@ format_args(Args) -> %% @private Format a single argument format_arg(A) when is_integer(A) -> integer_to_binary(A); format_arg(A) when is_float(A) -> float_to_binary(A); -format_arg(A) when is_binary(A) -> <<"'", A/binary, "'">>; -format_arg(A) when is_atom(A) -> <<"'", (atom_to_binary(A))/binary, "'">>; +format_arg(A) when is_binary(A) -> <<"'", (escape_py_literal(A))/binary, "'">>; +format_arg(A) when is_atom(A) -> <<"'", (escape_py_literal(atom_to_binary(A)))/binary, "'">>; format_arg(A) when is_list(A) -> iolist_to_binary([<<"[">>, format_args(A), <<"]">>]); format_arg(_) -> <<"None">>. @@ -454,7 +454,7 @@ format_arg(_) -> <<"None">>. format_kwargs(Kwargs) when map_size(Kwargs) == 0 -> <<>>; format_kwargs(Kwargs) -> KwList = maps:fold(fun(K, V, Acc) -> - KB = if is_atom(K) -> atom_to_binary(K); is_binary(K) -> K end, + KB = valid_py_ident(if is_atom(K) -> atom_to_binary(K); is_binary(K) -> K end), [<>, lists:join(<<", ">>, KwList)]). @@ -543,7 +543,9 @@ stream_start(Module, Func, Args, Opts) -> {ok, Ref}. %% @private Run the streaming via Python code -stream_run_python(ModuleBin, FuncBin, RefHash) -> +stream_run_python(ModuleBin0, FuncBin0, RefHash) -> + ModuleBin = valid_py_module(ModuleBin0), + FuncBin = valid_py_ident(FuncBin0), RefHashBin = integer_to_binary(RefHash), %% Build Python code that streams values using callbacks Code = iolist_to_binary([ @@ -898,14 +900,13 @@ create_venv(Path, Opts) -> undefined -> get_python_executable(); P -> P end, - Cmd = case Installer of + case Installer of uv -> %% uv venv is faster, use --python to match the running interpreter - io_lib:format("uv venv --python ~s ~s", [quote(Python), quote(Path)]); + run_cmd(uv_exe(), ["venv", "--python", Python, Path], []); pip -> - io_lib:format("~s -m venv ~s", [quote(Python), quote(Path)]) - end, - run_cmd(lists:flatten(Cmd)). + run_cmd(Python, ["-m", "venv", Path], []) + end. %% @private Get the Python executable path %% When embedded, sys.executable returns the embedding app (beam.smp) @@ -925,30 +926,28 @@ get_python_executable() -> -spec install_deps(string(), string(), list()) -> ok | {error, term()}. install_deps(Path, RequirementsFile, Opts) -> Installer = detect_installer(Opts), - PipPath = pip_path(Path, Installer), + {Exe, BaseArgs, PortOpts} = pip_command(Path, Installer), Extras = proplists:get_value(extras, Opts, []), - %% Determine file type and build install command - Cmd = case filename:extension(RequirementsFile) of + %% Determine file type and build the install argument list (no shell). + Args = case filename:extension(RequirementsFile) of ".txt" -> - %% requirements.txt - io_lib:format("~s install -r ~s", [PipPath, quote(RequirementsFile)]); + BaseArgs ++ ["install", "-r", RequirementsFile]; ".toml" -> - %% pyproject.toml - install as editable + %% pyproject.toml - install as editable. %% filename:dirname returns "." for files without directory component InstallPath = filename:dirname(RequirementsFile), case Extras of [] -> - io_lib:format("~s install -e ~s", [PipPath, quote(InstallPath)]); + BaseArgs ++ ["install", "-e", InstallPath]; _ -> ExtrasStr = string:join(Extras, ","), - io_lib:format("~s install -e \"~s[~s]\"", [PipPath, InstallPath, ExtrasStr]) + BaseArgs ++ ["install", "-e", InstallPath ++ "[" ++ ExtrasStr ++ "]"] end; _ -> - %% Assume requirements.txt format - io_lib:format("~s install -r ~s", [PipPath, quote(RequirementsFile)]) + BaseArgs ++ ["install", "-r", RequirementsFile] end, - run_cmd(lists:flatten(Cmd)). + run_cmd(Exe, Args, PortOpts). %% @private Detect which installer to use (uv or pip) -spec detect_installer(list()) -> uv | pip. @@ -963,40 +962,74 @@ detect_installer(Opts) -> Installer end. -%% @private Get pip/uv pip command path --spec pip_path(string(), uv | pip) -> string(). -pip_path(VenvPath, uv) -> - %% uv pip uses venv from env var or --python flag - "VIRTUAL_ENV=" ++ quote(VenvPath) ++ " uv pip"; -pip_path(VenvPath, pip) -> - %% Use pip from the venv - case os:type() of +%% @private Resolve the installer into {Executable, BaseArgs, PortOpts}. +%% For uv the venv is selected via the VIRTUAL_ENV port env option (not a shell +%% prefix); for pip we use the venv's own pip binary. +-spec pip_command(string(), uv | pip) -> {string(), [string()], list()}. +pip_command(VenvPath, uv) -> + {uv_exe(), ["pip"], [{env, [{"VIRTUAL_ENV", VenvPath}]}]}; +pip_command(VenvPath, pip) -> + PipExe = case os:type() of {win32, _} -> filename:join([VenvPath, "Scripts", "pip"]); _ -> filename:join([VenvPath, "bin", "pip"]) + end, + {PipExe, [], []}. + +%% @private Full path to the uv executable (falls back to the bare name). +-spec uv_exe() -> string(). +uv_exe() -> + case os:find_executable("uv") of + false -> "uv"; + P -> P + end. + +%% @private Run an executable with an argv list (no shell) and return ok or error. +-spec run_cmd(string(), [string()], list()) -> ok | {error, term()}. +run_cmd(Exe, Args, ExtraOpts) -> + case resolve_exe(Exe) of + {error, _} = Err -> + Err; + ExeFull -> + try open_port({spawn_executable, ExeFull}, + [exit_status, stderr_to_stdout, binary, {args, Args} | ExtraOpts]) of + Port -> collect_port(Port, []) + catch + error:Reason -> {error, {spawn_failed, Exe, Reason}} + end end. -%% @private Quote a path for shell --spec quote(string()) -> string(). -quote(S) -> - "'" ++ S ++ "'". - -%% @private Run a shell command and return ok or error --spec run_cmd(string()) -> ok | {error, term()}. -run_cmd(Cmd) -> - %% Use os:cmd but check for errors - Result = os:cmd(Cmd ++ " 2>&1; echo \"::exitcode::$?\""), - %% Parse exit code from end of output - case string:split(Result, "::exitcode::", trailing) of - [Output, ExitCodeStr] -> - case string:trim(ExitCodeStr) of - "0" -> ok; - Code -> {error, {exit_code, list_to_integer(Code), string:trim(Output)}} +%% @private Resolve an executable name/path to a full path (spawn_executable does +%% not search PATH). +-spec resolve_exe(string()) -> string() | {error, term()}. +resolve_exe(Exe) -> + case filename:pathtype(Exe) of + absolute -> + case filelib:is_file(Exe) of + true -> Exe; + false -> {error, {executable_not_found, Exe}} end; _ -> - %% Fallback - assume success if no error marker - ok + case os:find_executable(Exe) of + false -> {error, {executable_not_found, Exe}}; + Found -> Found + end + end. + +%% @private Collect a spawned port's output and exit status. +-spec collect_port(port(), [binary()]) -> ok | {error, term()}. +collect_port(Port, Acc) -> + receive + {Port, {data, Data}} -> + collect_port(Port, [Data | Acc]); + {Port, {exit_status, 0}} -> + ok; + {Port, {exit_status, Code}} -> + {error, {exit_code, Code, iolist_to_binary(lists:reverse(Acc))}} + after 300000 -> + try port_close(Port) catch _:_ -> ok end, + {error, timeout} end. %% @private Convert to string @@ -1070,6 +1103,51 @@ escape_python_string(Str) -> (C) -> [C] end, Str). +%% @private Escape a binary for safe embedding inside a single-quoted Python +%% string literal: quote, backslash, and newline/CR/tab/other control bytes that +%% would otherwise break out of or corrupt the literal. +escape_py_literal(Bin) when is_binary(Bin) -> + << <<(escape_py_byte(B))/binary>> || <> <= Bin >>. + +escape_py_byte($') -> <<"\\'">>; +escape_py_byte($\\) -> <<"\\\\">>; +escape_py_byte($\n) -> <<"\\n">>; +escape_py_byte($\r) -> <<"\\r">>; +escape_py_byte($\t) -> <<"\\t">>; +escape_py_byte(B) when B < 16#20; B =:= 16#7f -> + list_to_binary(io_lib:format("\\x~2.16.0b", [B])); +escape_py_byte(B) -> <>. + +%% @private Validate a Python identifier ([A-Za-z_][A-Za-z0-9_]*). Crashes on a +%% non-conforming value so an attacker-controlled module/func/kwarg name can't +%% inject code at an identifier position (where quoting is meaningless). +valid_py_ident(Bin) when is_binary(Bin), byte_size(Bin) > 0 -> + case ident_ok(Bin, first) of + true -> Bin; + false -> error({invalid_python_identifier, Bin}) + end; +valid_py_ident(Other) -> + error({invalid_python_identifier, Other}). + +%% @private Validate a dotted Python module path (each segment an identifier). +valid_py_module(Bin) when is_binary(Bin), byte_size(Bin) > 0 -> + Segments = binary:split(Bin, <<".">>, [global]), + lists:foreach(fun valid_py_ident/1, Segments), + Bin; +valid_py_module(Other) -> + error({invalid_python_identifier, Other}). + +ident_ok(<<>>, first) -> false; %% empty segment (leading/trailing/double dot) +ident_ok(<<>>, rest) -> true; +ident_ok(<>, first) + when (C >= $A andalso C =< $Z); (C >= $a andalso C =< $z); C =:= $_ -> + ident_ok(Rest, rest); +ident_ok(<>, rest) + when (C >= $A andalso C =< $Z); (C >= $a andalso C =< $z); + (C >= $0 andalso C =< $9); C =:= $_ -> + ident_ok(Rest, rest); +ident_ok(_, _) -> false. + %% @doc Deactivate the current virtual environment. %% Restores sys.path to its original state. -spec deactivate_venv() -> ok | {error, term()}. @@ -1262,13 +1340,13 @@ configure_logging(Opts) -> iolist_to_binary([ "__import__('erlang').setup_logging(", integer_to_binary(LevelInt), - ", '", F, "')" + ", '", escape_py_literal(F), "')" ]); F when is_list(F) -> iolist_to_binary([ "__import__('erlang').setup_logging(", integer_to_binary(LevelInt), - ", '", F, "')" + ", '", escape_py_literal(iolist_to_binary(F)), "')" ]) end, case eval(Code) of diff --git a/src/py_state.erl b/src/py_state.erl index 3ebffed..ce71b7c 100644 --- a/src/py_state.erl +++ b/src/py_state.erl @@ -72,6 +72,11 @@ -define(TABLE, py_state). +%% Reserved sentinel row holding the live entry count, used by the optional size +%% cap. User keys are rejected from this slot so callers (Erlang or Python) can't +%% corrupt the accounting. +-define(SIZE_KEY, '$py_state_size$'). + %%% ============================================================================ %%% API %%% ============================================================================ @@ -112,6 +117,7 @@ register_callbacks() -> %% @doc Fetch a value from the shared state. -spec fetch(Key :: term()) -> {ok, term()} | {error, not_found}. +fetch(?SIZE_KEY) -> {error, not_found}; fetch(Key) -> case ets:lookup(?TABLE, Key) of [{_, Value}] -> {ok, Value}; @@ -119,21 +125,62 @@ fetch(Key) -> end. %% @doc Store a value in the shared state. --spec store(Key :: term(), Value :: term()) -> ok. +-spec store(Key :: term(), Value :: term()) -> ok | {error, full | reserved_key}. +store(?SIZE_KEY, _Value) -> + {error, reserved_key}; store(Key, Value) -> - ets:insert(?TABLE, {Key, Value}), - ok. + case max_entries() of + infinity -> + ets:insert(?TABLE, {Key, Value}), + ok; + Max -> + %% Atomic admission: only genuinely new keys consume capacity, and the + %% reserve/rollback uses ets:update_counter so there is no TOCTOU race + %% on the public, write-concurrent table. Overwrites don't change count. + case ets:insert_new(?TABLE, {Key, Value}) of + true -> + Count = ets:update_counter(?TABLE, ?SIZE_KEY, {2, 1}, {?SIZE_KEY, 0}), + case Count > Max of + true -> + ets:delete(?TABLE, Key), + ets:update_counter(?TABLE, ?SIZE_KEY, {2, -1}, {?SIZE_KEY, 0}), + {error, full}; + false -> + ok + end; + false -> + ets:insert(?TABLE, {Key, Value}), + ok + end + end. %% @doc Remove a key from the shared state. -spec remove(Key :: term()) -> ok. +remove(?SIZE_KEY) -> + ok; remove(Key) -> - ets:delete(?TABLE, Key), - ok. + case max_entries() of + infinity -> + ets:delete(?TABLE, Key), + ok; + _Max -> + %% Decrement only when a real user key was actually present (ets:take so + %% a missing-key remove can't drift the counter negative). + case ets:take(?TABLE, Key) of + [_] -> + ets:update_counter(?TABLE, ?SIZE_KEY, {2, -1}, {?SIZE_KEY, 0}), + ok; + [] -> + ok + end + end. %% @doc Get all keys in the shared state. -spec keys() -> [term()]. keys() -> - ets:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ?TABLE). + ets:foldl(fun({?SIZE_KEY, _}, Acc) -> Acc; + ({K, _}, Acc) -> [K | Acc] + end, [], ?TABLE). %% @doc Clear all entries from the shared state. -spec clear() -> ok. @@ -148,6 +195,8 @@ incr(Key) -> %% @doc Atomically increment a counter by Amount. Initializes to Amount if not exists. -spec incr(Key :: term(), Amount :: integer()) -> integer(). +incr(?SIZE_KEY, _Amount) -> + error(reserved_key); incr(Key, Amount) -> try ets:update_counter(?TABLE, Key, {2, Amount}) @@ -168,6 +217,12 @@ decr(Key) -> decr(Key, Amount) -> incr(Key, -Amount). +%% @private Configured entry cap. `infinity' (the default) preserves the previous +%% unbounded behavior; set application env `max_state_entries' to a positive +%% integer to bound memory growth from Python-driven state_set calls. +max_entries() -> + application:get_env(erlang_python, max_state_entries, infinity). + %%% ============================================================================ %%% Callback wrappers (for Python access) %%% ============================================================================ @@ -181,8 +236,10 @@ state_get_callback([Key]) -> %% @private state_set_callback([Key, Value]) -> - store(Key, Value), - none. + case store(Key, Value) of + ok -> none; + {error, Reason} -> {error, Reason} + end. %% @private state_delete_callback([Key]) -> diff --git a/test/py_SUITE.erl b/test/py_SUITE.erl index d4b0f0a..5163d7e 100644 --- a/test/py_SUITE.erl +++ b/test/py_SUITE.erl @@ -21,6 +21,8 @@ test_spawn_call/1, test_type_conversions/1, test_nested_types/1, + test_conversion_depth_guard/1, + test_embedded_nul_name_rejected/1, test_timeout/1, test_special_floats/1, test_streaming/1, @@ -82,6 +84,8 @@ all() -> test_spawn_call, test_type_conversions, test_nested_types, + test_conversion_depth_guard, + test_embedded_nul_name_rejected, test_timeout, test_special_floats, test_streaming, @@ -312,6 +316,28 @@ test_nested_types(_Config) -> ok. +%% @doc A term nested far deeper than the converter's depth cap must fail cleanly +%% (RecursionError -> arg conversion error) rather than overflow the C stack and +%% crash the node. Regression for the recursion guard in py_convert. +test_conversion_depth_guard(_Config) -> + %% 1M deep: without the guard this overflows the C stack and crashes the + %% node; with it, conversion bails at PY_CONVERT_MAX_DEPTH and returns an error. + Deep = lists:foldl(fun(_, Acc) -> [Acc] end, [], lists:seq(1, 1000000)), + {error, _} = py:call(math, sqrt, [Deep]), + %% Node/context is still alive and usable afterwards. + {ok, 4.0} = py:call(math, sqrt, [16]), + ok. + +%% @doc A code string with an embedded NUL must be rejected, not silently +%% truncated at the NUL (which would run something other than intended). Exercises +%% the binary_to_string NUL guard shared by all name/code decoding. +test_embedded_nul_name_rejected(_Config) -> + {error, _} = py:eval(<<"1", 0, "+1">>), + {error, _} = py:exec(<<"x = 1", 0, "; y = 2">>), + %% Node still alive and clean code still works. + {ok, 2} = py:eval(<<"1+1">>), + ok. + test_timeout(_Config) -> %% Test that timeout works - use time.sleep which guarantees delay %% time.sleep(1) will definitely exceed 100ms timeout diff --git a/test/py_buffer_SUITE.erl b/test/py_buffer_SUITE.erl index 6a4a490..46baa48 100644 --- a/test/py_buffer_SUITE.erl +++ b/test/py_buffer_SUITE.erl @@ -22,6 +22,7 @@ seek_tell_test/1, find_test/1, memoryview_test/1, + buffer_grow_pinned_test/1, iterator_test/1, closed_buffer_test/1, empty_buffer_test/1, @@ -40,6 +41,7 @@ all() -> [ seek_tell_test, find_test, memoryview_test, + buffer_grow_pinned_test, iterator_test, closed_buffer_test, empty_buffer_test, @@ -306,6 +308,32 @@ def read_buffer(buf): ok. +%% @doc A buffer write that must grow the storage is refused while a Python +%% memoryview pins it (relocating the storage would dangle the view -> UAF). +%% Regression for the zero-copy view_count guard in py_buffer_write. +buffer_grow_pinned_test(_Config) -> + {ok, Buf} = py_buffer:new(8), %% capacity 8 bytes + ok = py_buffer:write(Buf, <<"ab">>), %% write_pos 2, within capacity + Ctx = py:context(1), + + %% Python holds a memoryview pinning the buffer (kept in a persistent global). + {ok, <<"pinned">>} = py:eval(Ctx, + <<"globals().setdefault('_keep', []).append(memoryview(buf)) or 'pinned'">>, + #{<<"buf">> => Buf}), + + %% Growing the buffer now would relocate buf->data and dangle the memoryview, + %% so the write is refused (no crash) while the view is pinned. + {error, _} = py_buffer:write(Buf, binary:copy(<<"x">>, 1000)), + + %% Release the view; the same write now succeeds. + {ok, <<"released">>} = py:eval(Ctx, <<"_keep[0].release() or 'released'">>), + ok = py_buffer:write(Buf, binary:copy(<<"x">>, 1000)), + + %% Context still alive and usable. + {ok, 2} = py:eval(Ctx, <<"1+1">>), + ok = py_buffer:close(Buf), + ok. + %% @doc Test that buffer resources are properly garbage collected %% Verifies reference counting between Erlang and Python gc_refcount_test(_Config) -> diff --git a/test/py_fd_ops_SUITE.erl b/test/py_fd_ops_SUITE.erl index d305274..c1c7c55 100644 --- a/test/py_fd_ops_SUITE.erl +++ b/test/py_fd_ops_SUITE.erl @@ -28,7 +28,8 @@ fd_read_write_test/1, fd_close_test/1, fd_select_test/1, - dup_fd_test/1 + dup_fd_test/1, + fd_registry_invalid_id_test/1 ]). all() -> @@ -37,7 +38,8 @@ all() -> fd_read_write_test, fd_close_test, fd_select_test, - dup_fd_test + dup_fd_test, + fd_registry_invalid_id_test ]. init_per_suite(Config) -> @@ -176,3 +178,18 @@ dup_fd_test(_Config) -> ok = py_nif:fd_close(DupFd2), ok = py_nif:fd_close(Fd1), ok. + +%% @doc A stale, duplicate, or fabricated fd id must be a safe no-op (or clean +%% error), not a raw-pointer dereference. Regression for the validating +%% fd-handle registry: pre-fix the id was cast straight to a pointer and +%% dereferenced, so a bogus id crashed the node. +fd_registry_invalid_id_test(_Config) -> + {ok, none} = py:eval(<<"__import__('py_event_loop')._remove_reader(999999999)">>), + {ok, none} = py:eval(<<"__import__('py_event_loop')._remove_writer(123456789)">>), + {ok, none} = py:eval(<<"__import__('py_event_loop')._release_fd_resource(42)">>), + {ok, none} = py:eval(<<"__import__('py_event_loop')._clear_fd_read(7)">>), + %% _update_fd_* rejects an unknown id with a clean ValueError, not a crash. + {error, _} = py:eval(<<"__import__('py_event_loop')._update_fd_read(54321, 1)">>), + %% Node still alive and usable. + {ok, 2} = py:eval(<<"1+1">>), + ok. diff --git a/test/py_owngil_features_SUITE.erl b/test/py_owngil_features_SUITE.erl index 2a43c30..c611953 100644 --- a/test/py_owngil_features_SUITE.erl +++ b/test/py_owngil_features_SUITE.erl @@ -47,6 +47,7 @@ owngil_reentrant_basic_test/1, owngil_reentrant_nested_test/1, owngil_reentrant_concurrent_test/1, + owngil_reentrant_multi_stress_test/1, owngil_reentrant_complex_types_test/1, owngil_reentrant_thread_callback_test/1, owngil_reentrant_try_except_test/1 @@ -156,6 +157,7 @@ groups() -> owngil_reentrant_basic_test, owngil_reentrant_nested_test, owngil_reentrant_concurrent_test, + owngil_reentrant_multi_stress_test, owngil_reentrant_complex_types_test, owngil_reentrant_thread_callback_test, owngil_reentrant_try_except_test @@ -705,6 +707,36 @@ owngil_reentrant_nested_test(_Config) -> py_context:stop(Ctx). +%% @doc Stress the per-interpreter exception machinery (H8) and the bounded +%% OWN_GIL dispatch (H7): many reentrant suspend/resume cycles across several +%% subinterpreter contexts concurrently. A cross-interpreter exception object or +%% a dispatch read/write desync would corrupt or crash the node under this load. +owngil_reentrant_multi_stress_test(_Config) -> + py:register_function(owngil_level, fun([Level, N]) -> + case Level >= N of + true -> Level; + false -> + Code = iolist_to_binary(io_lib:format( + "__import__('erlang').call('owngil_level', ~p, ~p)", [Level + 1, N])), + {ok, R} = py:eval(Code), + R + end + end), + %% Kept light so it stays fast and non-flaky on slow CI hosts while still + %% driving concurrent reentrant suspend/resume across distinct subinterpreters. + Parent = self(), + Pids = [spawn(fun() -> + {ok, Ctx} = py_context:start_link(CtxId, owngil), + Ok = lists:all(fun(_) -> + {ok, 4} =:= py_context:eval(Ctx, + <<"__import__('erlang').call('owngil_level', 1, 4)">>, #{}) + end, lists:seq(1, 5)), + py_context:stop(Ctx), + Parent ! {self(), Ok} + end) || CtxId <- lists:seq(1, 2)], + [receive {P, true} -> ok after 120000 -> error({stress_failed, P}) end || P <- Pids], + ok. + %% @doc Concurrent callbacks from multiple owngil contexts owngil_reentrant_concurrent_test(_Config) -> NumContexts = 4, diff --git a/test/py_reentrant_SUITE.erl b/test/py_reentrant_SUITE.erl index 9d3818a..64666f6 100644 --- a/test/py_reentrant_SUITE.erl +++ b/test/py_reentrant_SUITE.erl @@ -24,7 +24,9 @@ test_call_from_non_worker_thread/1, test_callback_with_try_except/1, test_async_call/1, - test_callback_name_registry/1 + test_callback_name_registry/1, + test_etf_decode_safe/1, + test_reentrant_resume_stress/1 ]). all() -> @@ -38,7 +40,9 @@ all() -> test_call_from_non_worker_thread, test_callback_with_try_except, test_async_call, - test_callback_name_registry + test_callback_name_registry, + test_etf_decode_safe, + test_reentrant_resume_stress ]. init_per_suite(Config) -> @@ -65,6 +69,9 @@ end_per_testcase(_TestCase, _Config) -> try py:unregister_function(subtract_five) catch _:_ -> ok end, try py:unregister_function(async_multiply) catch _:_ -> ok end, try py:unregister_function(test_registry_func) catch _:_ -> ok end, + try py:unregister_function(etf_probe_ok) catch _:_ -> ok end, + try py:unregister_function(etf_probe_novel) catch _:_ -> ok end, + try py:unregister_function(rs_double) catch _:_ -> ok end, ok. %%% ============================================================================ @@ -89,6 +96,77 @@ test_basic_reentrant(_Config) -> ok. +%% @doc Many reentrant suspend/resume cycles must all succeed with no leak, +%% stale-TLS invariant trip, or crash. Regression for the suspend/resume lifetime +%% fixes (worker keep, TLS clear, result_data free, hardened callback-pipe writes); +%% exercises the callback pipe and context resume under repetition. +test_reentrant_resume_stress(_Config) -> + py:register_function(rs_double, fun([X]) -> + {ok, R} = py:eval(iolist_to_binary(io_lib:format("~p * 2", [X]))), + R + end), + Got = [begin + Code = iolist_to_binary( + io_lib:format("__import__('erlang').call('rs_double', ~p) + 1", [N])), + {ok, V} = py:eval(Code), + V + end || N <- lists:seq(1, 100)], + Got = [N * 2 + 1 || N <- lists:seq(1, 100)], + py:unregister_function(rs_double), + ok. + +%% @doc Regression for the binary_to_term SAFE-flag hardening (atom exhaustion). +%% A `__etf__:` callback result that encodes a brand-new atom must be rejected (not +%% decoded) so it cannot mint non-GC'd atoms, while a valid existing-atom payload +%% still round-trips through the same enif_binary_to_term path. +test_etf_decode_safe(_Config) -> + %% Positive: an EXISTING atom encoded via __etf__: still decodes under SAFE, + %% proving the decode path runs and the change is non-breaking. + OkMarker = etf_marker(term_to_binary(ok)), + py:register_function(etf_probe_ok, fun(_) -> OkMarker end), + {ok, GotOk} = py:eval(<<"__import__('erlang').call('etf_probe_ok', [])">>), + true = (GotOk =/= OkMarker), %% decoded to the term, not passed through verbatim + py:unregister_function(etf_probe_ok), + + %% Negative: many DISTINCT brand-new atoms encoded via __etf__: must all be + %% rejected. Pre-fix (flags=0) each would create a permanent atom and come back + %% as that atom; with ERL_NIF_BIN2TERM_SAFE the raw marker is returned unchanged. + Before = erlang:system_info(atom_count), + N = 50, + lists:foreach( + fun(I) -> + Name = "zzqx_etf_safe_" ++ integer_to_list(I), + Marker = etf_marker(novel_atom_etf(Name)), + py:register_function(etf_probe_novel, fun(_) -> Marker end), + {ok, Got} = py:eval(<<"__import__('erlang').call('etf_probe_novel', [])">>), + Marker = Got, %% rejected: returned verbatim, never decoded to the atom + assert_atom_absent(Name) + end, + lists:seq(1, N) + ), + py:unregister_function(etf_probe_novel), + After = erlang:system_info(atom_count), + %% The N distinct novel atoms were not created (slack for unrelated atoms). + true = (After - Before) < (N div 2), + ok. + +%% @private Build a "__etf__:" marker the C side base64-decodes and feeds to +%% enif_binary_to_term (see term_to_python_repr/1 in py_context for the real encoder). +etf_marker(Etf) -> + <<"__etf__:", (base64:encode(Etf))/binary>>. + +%% @private Hand-craft the ETF for a brand-new atom WITHOUT creating it in this VM +%% (an atom literal would create it). SMALL_ATOM_UTF8_EXT: <<131, 119, Len, Name>>. +novel_atom_etf(Name) -> + Bin = list_to_binary(Name), + <<131, 119, (byte_size(Bin)), Bin/binary>>. + +%% @private Fail if Name was created as an atom. +assert_atom_absent(Name) -> + try list_to_existing_atom(Name) of + _ -> error({atom_created, Name}) + catch error:badarg -> ok end. + %% @doc Test deeply nested callbacks (3+ levels). %% Tests Erlang→Python→Erlang→Python... nesting test_nested_callbacks(_Config) -> diff --git a/test/py_state_SUITE.erl b/test/py_state_SUITE.erl new file mode 100644 index 0000000..068fc0a --- /dev/null +++ b/test/py_state_SUITE.erl @@ -0,0 +1,91 @@ +%%% @doc Common Test suite for py_state shared-state store, focused on the +%%% optional entry cap and its accounting (memory-exhaustion resistance). +-module(py_state_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + cap_disabled_test/1, + cap_enforced_test/1, + cap_accounting_test/1, + sentinel_protected_test/1 +]). + +-define(SIZE_KEY, '$py_state_size$'). + +all() -> + [cap_disabled_test, cap_enforced_test, cap_accounting_test, sentinel_protected_test]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + Config. + +end_per_suite(_Config) -> + application:stop(erlang_python), + ok. + +init_per_testcase(_TC, Config) -> + application:unset_env(erlang_python, max_state_entries), + py_state:clear(), + Config. + +end_per_testcase(_TC, _Config) -> + application:unset_env(erlang_python, max_state_entries), + py_state:clear(), + ok. + +%% @doc Default (infinity) preserves the previous unbounded behavior. +cap_disabled_test(_Config) -> + [ok = py_state:store(N, N) || N <- lists:seq(1, 1000)], + {ok, 500} = py_state:fetch(500), + ok. + +%% @doc A finite cap rejects new keys beyond the limit; overwrites don't consume +%% capacity. +cap_enforced_test(_Config) -> + application:set_env(erlang_python, max_state_entries, 5), + [ok = py_state:store({k, N}, N) || N <- lists:seq(1, 5)], + {error, full} = py_state:store({k, 6}, 6), + %% Overwriting an existing key does NOT consume capacity. + ok = py_state:store({k, 3}, 30), + {ok, 30} = py_state:fetch({k, 3}), + {error, full} = py_state:store({k, 7}, 7), + ok. + +%% @doc Removing frees capacity; removing a missing key must not drift the count +%% (which would let the cap be bypassed or never trip). +cap_accounting_test(_Config) -> + application:set_env(erlang_python, max_state_entries, 3), + ok = py_state:store(a, 1), + ok = py_state:store(b, 2), + ok = py_state:store(c, 3), + {error, full} = py_state:store(d, 4), + %% Remove a missing key repeatedly: no phantom capacity freed. + [py_state:remove(missing) || _ <- lists:seq(1, 10)], + {error, full} = py_state:store(d, 4), + %% Remove a real key: frees exactly one slot. + ok = py_state:remove(a), + ok = py_state:store(d, 4), + {error, full} = py_state:store(e, 5), + ok. + +%% @doc The internal size sentinel can't be written or counter-corrupted by a +%% caller, and is hidden from keys/0 and fetch/1. +sentinel_protected_test(_Config) -> + application:set_env(erlang_python, max_state_entries, 2), + {error, reserved_key} = py_state:store(?SIZE_KEY, 9999), + {'EXIT', _} = (catch py_state:incr(?SIZE_KEY, 100)), + ok = py_state:store(x, 1), + ok = py_state:store(y, 2), + {error, full} = py_state:store(z, 3), + {error, not_found} = py_state:fetch(?SIZE_KEY), + false = lists:member(?SIZE_KEY, py_state:keys()), + ok. diff --git a/test/py_stream_SUITE.erl b/test/py_stream_SUITE.erl index 282a164..ff4dfb1 100644 --- a/test/py_stream_SUITE.erl +++ b/test/py_stream_SUITE.erl @@ -17,7 +17,8 @@ test_stream_cancel/1, test_stream_error/1, test_stream_empty/1, - test_stream_large/1 + test_stream_large/1, + test_stream_rejects_injection/1 ]). all() -> @@ -29,7 +30,8 @@ all() -> test_stream_cancel, test_stream_error, test_stream_empty, - test_stream_large + test_stream_large, + test_stream_rejects_injection ]. init_per_suite(Config) -> @@ -40,6 +42,18 @@ end_per_suite(_Config) -> ok = application:stop(erlang_python), ok. +%% @doc A module/func name (or kwarg key) that isn't a valid Python identifier +%% must be rejected, not interpolated into the generated source where it could +%% inject code. Regression for the stream source-builder hardening. +test_stream_rejects_injection(_Config) -> + {'EXIT', {{invalid_python_identifier, _}, _}} = + (catch py:stream(<<"os'); __import__('os').system('x">>, <<"walk">>, [], #{k => 1})), + {'EXIT', {{invalid_python_identifier, _}, _}} = + (catch py:stream(<<"math">>, <<"sqrt'); evil(">>, [], #{k => 1})), + {'EXIT', {{invalid_python_identifier, _}, _}} = + (catch py:stream(<<"math">>, <<"sqrt">>, [], #{<<"bad key)">> => 1})), + ok. + %% Helper to collect all stream events collect_stream(Ref) -> collect_stream(Ref, [], 5000). diff --git a/test/py_venv_SUITE.erl b/test/py_venv_SUITE.erl index b23cef1..fa20239 100644 --- a/test/py_venv_SUITE.erl +++ b/test/py_venv_SUITE.erl @@ -36,7 +36,8 @@ test_ensure_venv_force_recreate/1, test_activate_venv/1, test_deactivate_venv/1, - test_venv_info/1 + test_venv_info/1, + test_venv_path_metacharacters/1 ]). all() -> @@ -51,7 +52,8 @@ groups() -> test_ensure_venv_force_recreate, test_activate_venv, test_deactivate_venv, - test_venv_info + test_venv_info, + test_venv_path_metacharacters ]}]. init_per_suite(Config) -> @@ -123,6 +125,21 @@ test_ensure_venv_creates_venv(Config) -> true = maps:get(<<"active">>, Info), ok. +%% @doc A venv path containing a shell metacharacter (a single quote) must be +%% treated literally by spawn_executable, not interpreted by a shell. Pre-fix +%% this went through os:cmd with quote/1, where the embedded quote broke out of +%% the quoting (and could inject). (A space is avoided here only because it +%% breaks the venv's own pip shebang, which is unrelated to the shell fix.) +test_venv_path_metacharacters(Config) -> + TempDir = ?config(temp_dir, Config), + VenvPath = filename:join(TempDir, "venv'q$x"), + ReqFile = filename:join(TempDir, "requirements_meta.txt"), + ok = file:write_file(ReqFile, <<"# empty\n">>), + %% Created and used at the exact literal path (a shell would have broken it). + ok = py:ensure_venv(VenvPath, ReqFile, [{installer, pip}]), + true = filelib:is_file(filename:join(VenvPath, "pyvenv.cfg")), + ok. + test_ensure_venv_activates_existing(Config) -> TempDir = ?config(temp_dir, Config), VenvPath = filename:join(TempDir, "venv"),