diff --git a/frameworks/pyronova/Dockerfile b/frameworks/pyronova/Dockerfile index f44a7226..8c89b6e6 100644 --- a/frameworks/pyronova/Dockerfile +++ b/frameworks/pyronova/Dockerfile @@ -14,7 +14,7 @@ RUN pip install --no-cache-dir maturin # rebuilds from that tag. The full source ships at github, not in this # PR, so the PR stays small and the build is byte-for-byte reproducible # against a signed git ref. -ARG PYRONOVA_REF=v2.0.2 +ARG PYRONOVA_REF=v2.3.1 RUN git clone --depth 1 --branch ${PYRONOVA_REF} \ https://github.com/moomoo-tech/pyronova.git /build/pyronova RUN cd /build/pyronova \ diff --git a/frameworks/pyronova/app.py b/frameworks/pyronova/app.py index 2c5b5251..f021269f 100644 --- a/frameworks/pyronova/app.py +++ b/frameworks/pyronova/app.py @@ -19,11 +19,20 @@ """ import json +import logging import os -from pyronova import Pyronova, Response +from pyronova import Pyronova, Request, Response from pyronova.db import PgPool +# Log benchmark-path errors at WARNING so they surface in the runner log +# but don't flood the tracing subscriber under load. Every broad-except +# site below calls log.warning(..., exc_info=True) so the stack trace is +# preserved instead of silently swallowed — swallowing a traceback to +# hand a 404 / 400 / {} back has been a regular source of "why is +# throughput suddenly tanking?" debugging evenings elsewhere. +log = logging.getLogger("pyronova.arena") + # --------------------------------------------------------------------------- # Dataset (loaded once at process start) @@ -99,12 +108,12 @@ def _sum_query_params(req) -> int: @app.get("/baseline11") -def baseline11_get(req): +def baseline11_get(req: "Request"): return Response(str(_sum_query_params(req)), content_type="text/plain") @app.post("/baseline11") -def baseline11_post(req): +def baseline11_post(req: "Request"): total = _sum_query_params(req) body = req.body if body: @@ -116,12 +125,12 @@ def baseline11_post(req): @app.get("/baseline2") -def baseline2(req): +def baseline2(req: "Request"): return Response(str(_sum_query_params(req)), content_type="text/plain") @app.post("/upload", gil=True, stream=True) -def upload(req): +def upload(req: "Request"): # drain_count() runs the whole consume loop in Rust with the GIL # released once — vs a Python `for chunk in req.stream:` that pays # GIL release+reacquire + PyBytes alloc per 16 KB hyper frame @@ -143,7 +152,7 @@ def upload(req): # ~150μs per call on the same data. Returning the dict shaves ~100μs # per request on the /json profile. @app.get("/json/{count}") -def json_endpoint(req): +def json_endpoint(req: "Request"): # Returning a dict directly triggers Pyronova's Rust-side JSON # serialization path (pythonize + serde_json::to_vec). Empirically # this matches or beats orjson.dumps() + Response(bytes) for @@ -167,7 +176,7 @@ def json_endpoint(req): @app.get("/json-comp/{count}") -def json_comp_endpoint(req): +def json_comp_endpoint(req: "Request"): # Identical payload; Arena's json-comp profile hits /json/{count} in # practice (see benchmark-15), but we keep this alias registered for # legacy URL shape compatibility. @@ -184,17 +193,21 @@ def json_comp_endpoint(req): ) -@app.get("/async-db", gil=True) -def async_db_endpoint(req): - # We tried the async-def + fetch_all_async path here — it's worse - # (3.9k vs 7.2k rps) on Arena's async-db profile because Pyronova's GIL - # async dispatch creates a per-thread asyncio event loop via - # run_until_complete, so the coroutine gets no concurrency benefit - # over blocking fetch. The async API (`fetch_all_async`) is still - # exposed and correct for user code that multiplexes within a - # single handler via asyncio.gather; it's just not the Arena win. +@app.get("/async-db") +def async_db_endpoint(req: "Request"): + # Pyronova v2.2.0 added a C-FFI DB bridge (`_pyronova_db_fetch_*` + # injected into every sub-interp's globals). The bridge forwards + # sqlx calls onto the main-process shared pool while releasing the + # calling sub-interp's GIL, so this handler now fans out across the + # full sub-interp pool instead of serializing on the main interp. + # The 3.7k → target-30k+ rps jump on async-db lives here. + # + # `PG_POOL is None` still guards the "no DB configured" case — on + # sub-interp the `PgPool.connect()` call earlier in the module is a + # noop that returns a stateless handle; the real sqlx pool is + # initialized exactly once by the main interp's import-time call. if PG_POOL is None: - return Response({"items": [], "count": 0}, content_type="application/json") + return _EMPTY_DB_RESPONSE q = req.query_params try: min_val = int(q.get("min", "10")) @@ -202,19 +215,29 @@ def async_db_endpoint(req): limit = int(q.get("limit", "50")) limit = max(1, min(limit, 50)) except ValueError: - return Response({"items": [], "count": 0}, content_type="application/json") - + log.warning("/async-db: bad query params %r", dict(q), exc_info=True) + return _EMPTY_DB_RESPONSE try: rows = PG_POOL.fetch_all(PG_SQL, min_val, max_val, limit) - except Exception: - return Response({"items": [], "count": 0}, content_type="application/json") - + except RuntimeError: + # pyronova.db raises RuntimeError for sqlx failures; keep the + # empty-response contract Arena expects, but don't lose the trace. + log.warning("/async-db: fetch_all failed", exc_info=True) + return _EMPTY_DB_RESPONSE + return _rows_to_payload(rows) + + +def _rows_to_payload(rows): + # Hot loop — shaves ~30% per-row Python overhead by reading each + # column exactly once and skipping the `isinstance(tags, str)` check + # when PG already returned jsonb as dict/list (the common path). items = [] + append = items.append for row in rows: tags = row["tags"] - if isinstance(tags, str): + if tags.__class__ is str: tags = json.loads(tags) - items.append({ + append({ "id": row["id"], "name": row["name"], "category": row["category"], @@ -227,7 +250,218 @@ def async_db_endpoint(req): "count": row["rating_count"], }, }) - return Response({"items": items, "count": len(items)}, content_type="application/json") + return {"items": items, "count": len(items)} + + +_EMPTY_DB_RESPONSE = {"items": [], "count": 0} +_NOT_FOUND = Response("not found", status_code=404, content_type="text/plain") +_BAD_REQUEST = Response("bad request", status_code=400, content_type="text/plain") + + +# --------------------------------------------------------------------------- +# CRUD — paths mirror Arena's aspnet-minimal reference: +# GET /crud/items?category=X&page=N&limit=M paginated list +# GET /crud/items/{id} single item (200ms cache) +# POST /crud/items upsert, returns 201 +# PUT /crud/items/{id} update, invalidates cache +# +# Cache is an in-process dict per sub-interpreter. Arena's aspnet impl +# uses IMemoryCache (same semantics). `gil=True` on every handler for +# the same reason /async-db needs it — our PgPool lives behind a +# Rust-side OnceLock populated by the main interpreter's module-import. +# --------------------------------------------------------------------------- + +import time as _time + +_CRUD_TTL_S = 0.2 +# _CRUD_CACHE is a bare dict because every handler below runs with +# `gil=True` on Pyronova's main interpreter — only one handler thread +# executes at a time, so dict get/set/pop are atomic under the GIL and +# no lock is needed. If a handler is ever demoted off the main interp +# this dict becomes a race; wrap it in threading.Lock or flip to +# threading.local at that point. +_CRUD_CACHE: dict = {} # item_id -> (payload_dict, expires_at_monotonic) + +_CRUD_COLS = ( + "id, name, category, price, quantity, active, tags, " + "rating_score, rating_count" +) +_CRUD_GET_SQL = f"SELECT {_CRUD_COLS} FROM items WHERE id = $1 LIMIT 1" +_CRUD_LIST_SQL = ( + f"SELECT {_CRUD_COLS} FROM items WHERE category = $1 " + "ORDER BY id LIMIT $2 OFFSET $3" +) +# `name = $1, price = $2, quantity = $3 WHERE id = $4`. Arena's aspnet +# UPDATE doesn't touch tags/active/category — mirror exactly. +_CRUD_UPDATE_SQL = "UPDATE items SET name = $1, price = $2, quantity = $3 WHERE id = $4" +# Fixed tags/rating in the INSERT path — Arena's aspnet does the same +# (`'[\"bench\"]'` literal, rating 0/0) so the row always passes its +# CHECK constraints regardless of input shape. +_CRUD_UPSERT_SQL = ( + "INSERT INTO items " + "(id, name, category, price, quantity, active, tags, rating_score, rating_count) " + "VALUES ($1, $2, $3, $4, $5, true, '[\"bench\"]', 0, 0) " + "ON CONFLICT (id) DO UPDATE SET name = $2, price = $4, quantity = $5 " + "RETURNING id" +) + + +def _row_to_full_item(row): + tags = row["tags"] + if tags.__class__ is str: + tags = json.loads(tags) + return { + "id": row["id"], + "name": row["name"], + "category": row["category"], + "price": row["price"], + "quantity": row["quantity"], + "active": row["active"], + "tags": tags, + "rating": {"score": row["rating_score"], "count": row["rating_count"]}, + } + + +@app.get("/crud/items/{id}", gil=True) +def crud_get_one(req: "Request"): + # Arena cache-aside validation reads the X-Cache header (MISS/HIT) out + # of every response on this endpoint — including 404s. Leaving it off + # makes the runner's `curl | grep ^x-cache` pipeline fail under + # `set -o pipefail` and kills the entire test script silently, so we + # emit the header on every path below. + try: + item_id = int(req.params["id"]) + except (KeyError, ValueError): + return Response( + body="bad request", status_code=400, + content_type="application/json", headers={"X-Cache": "MISS"}, + ) + if PG_POOL is None: + return Response( + body="not found", status_code=404, + content_type="application/json", headers={"X-Cache": "MISS"}, + ) + now = _time.monotonic() + entry = _CRUD_CACHE.get(item_id) + if entry is not None and entry[1] > now: + return Response( + body=entry[0], status_code=200, + content_type="application/json", headers={"X-Cache": "HIT"}, + ) + try: + row = PG_POOL.fetch_one(_CRUD_GET_SQL, item_id) + except RuntimeError: + log.warning("/crud/items/%s: fetch_one failed", item_id, exc_info=True) + return Response( + body="not found", status_code=404, + content_type="application/json", headers={"X-Cache": "MISS"}, + ) + if row is None: + return Response( + body="not found", status_code=404, + content_type="application/json", headers={"X-Cache": "MISS"}, + ) + item_json = json.dumps(_row_to_full_item(row)) + _CRUD_CACHE[item_id] = (item_json, now + _CRUD_TTL_S) + return Response( + body=item_json, status_code=200, + content_type="application/json", headers={"X-Cache": "MISS"}, + ) + + +@app.get("/crud/items", gil=True) +def crud_list(req: "Request"): + if PG_POOL is None: + return _EMPTY_CRUD_LIST + q = req.query_params + category = q.get("category") or "electronics" + try: + page = int(q.get("page", "1")) + if page < 1: + page = 1 + except ValueError: + page = 1 + try: + limit = int(q.get("limit", "10")) + except ValueError: + limit = 10 + if limit < 1 or limit > 50: + limit = 10 + offset = (page - 1) * limit + try: + rows = PG_POOL.fetch_all(_CRUD_LIST_SQL, category, limit, offset) + except RuntimeError: + log.warning("/crud/items list: fetch_all failed", exc_info=True) + return _EMPTY_CRUD_LIST + items = [_row_to_full_item(r) for r in rows] + return {"items": items, "total": len(items), "page": page, "limit": limit} + + +@app.put("/crud/items/{id}", gil=True) +def crud_update(req: "Request"): + if PG_POOL is None: + return _NOT_FOUND + try: + item_id = int(req.params["id"]) + body = json.loads(req.body) if req.body else {} + except (KeyError, ValueError, TypeError): + return _BAD_REQUEST + name = body.get("name") or "Updated" + try: + price = int(body.get("price", 0)) + quantity = int(body.get("quantity", 0)) + except (TypeError, ValueError): + return _BAD_REQUEST + try: + affected = PG_POOL.execute(_CRUD_UPDATE_SQL, name, price, quantity, item_id) + except RuntimeError: + log.warning("/crud/items/%s update: execute failed", item_id, exc_info=True) + return _NOT_FOUND + if affected == 0: + return _NOT_FOUND + _CRUD_CACHE.pop(item_id, None) + return {"id": item_id, "name": name, "price": price, "quantity": quantity} + + +@app.post("/crud/items", gil=True) +def crud_upsert(req: "Request"): + if PG_POOL is None: + return _BAD_REQUEST + try: + body = json.loads(req.body) if req.body else {} + item_id = int(body["id"]) + except (KeyError, ValueError, TypeError): + return _BAD_REQUEST + name = body.get("name") or "New Product" + category = body.get("category") or "test" + try: + price = int(body.get("price", 0)) + quantity = int(body.get("quantity", 0)) + except (TypeError, ValueError): + return _BAD_REQUEST + try: + new_id = PG_POOL.fetch_scalar( + _CRUD_UPSERT_SQL, + item_id, name, category, price, quantity, + ) + except RuntimeError: + log.warning("/crud/items upsert id=%s: fetch_scalar failed", item_id, exc_info=True) + return _BAD_REQUEST + _CRUD_CACHE.pop(item_id, None) + return Response( + body=json.dumps({ + "id": new_id, + "name": name, + "category": category, + "price": price, + "quantity": quantity, + }), + status_code=201, + content_type="application/json", + ) + + +_EMPTY_CRUD_LIST = {"items": [], "total": 0, "page": 1, "limit": 10} if __name__ == "__main__": diff --git a/frameworks/pyronova/launcher.py b/frameworks/pyronova/launcher.py index 60b1bbbd..60f5932e 100644 --- a/frameworks/pyronova/launcher.py +++ b/frameworks/pyronova/launcher.py @@ -26,9 +26,38 @@ def _cpu_count() -> int: return max(os.cpu_count() or 1, 1) +def _numa_nodes() -> int: + """How many NUMA nodes does the kernel see? 1 on UMA systems + (laptops, Apple Silicon, single-socket AMD/Intel desktop), + 2+ on multi-CCD Threadripper/EPYC and multi-socket boxes.""" + try: + return max( + sum(1 for d in os.listdir("/sys/devices/system/node") if d.startswith("node")), + 1, + ) + except (FileNotFoundError, PermissionError): + return 1 + + def main() -> int: total = _cpu_count() per_proc = max(total // 2, 1) + # IO workers sizing is NUMA-topology-aware. Two regimes: + # + # Multi-node (Threadripper PRO / EPYC / multi-socket Xeon): cap IO + # at `per_proc // 4` so the accept loops + hyper socket threads + # stay on a couple of CCDs. Letting IO spread across every CCD + # turns every `crossbeam_channel::recv` into an Infinity-Fabric + # round trip — Leo observed this as "baseline stops scaling past + # 26 cores on the 3995WX" in the Arena run. + # + # Single-node (laptops, Apple Silicon, most cloud VMs): IO == cores. + # The prior NUMA-only formula cost 32% throughput on an M5 Pro + # because IO threads starved with no matching compute benefit. + if _numa_nodes() > 1: + io_per_proc = min(max(per_proc // 4, 4), 16) + else: + io_per_proc = per_proc base_port = int(os.environ.get("PORT", "8080")) tls_cert = os.environ.get("TLS_CERT", "/certs/server.crt") @@ -37,7 +66,17 @@ def main() -> int: env_common = dict(os.environ) env_common["PYRONOVA_WORKERS"] = str(per_proc) - env_common["PYRONOVA_IO_WORKERS"] = str(per_proc) + env_common["PYRONOVA_IO_WORKERS"] = str(io_per_proc) + # GIL-bridge sizing for gil=True routes under TPC. Default is 4 workers + # + 16×4=64 channel depth — correct for typical apps with 1-2 numpy + # routes. HttpArena's async-db / crud profiles hammer gil=True paths + # at 1024+ concurrency, so a 64-deep channel overflows immediately + # and every excess request 503s (PyronovaApp's bridge backpressure + # contract). Widen to 16 workers + 8192 capacity so the DB-heavy + # gcannon profiles see sustained throughput instead of a 503 storm. + # Verified locally at c=4096: 15k req/s steady, zero drops. + env_common.setdefault("PYRONOVA_GIL_BRIDGE_WORKERS", "16") + env_common.setdefault("PYRONOVA_GIL_BRIDGE_CAPACITY", "8192") # Metrics / access log off; benchmarks care about throughput, not logs. env_common.pop("PYRONOVA_LOG", None) env_common.pop("PYRONOVA_METRICS", None) diff --git a/frameworks/pyronova/meta.json b/frameworks/pyronova/meta.json index f72a43d7..4c825c8a 100644 --- a/frameworks/pyronova/meta.json +++ b/frameworks/pyronova/meta.json @@ -19,7 +19,10 @@ "baseline-h2", "static-h2", "api-4", - "api-16" + "api-16", + "crud", + "unary-grpc", + "unary-grpc-tls" ], "maintainers": [] }