diff --git a/README.md b/README.md index fb0a8d0..f55dc61 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This catalog is the foundation for generating language bindings (Python, Java, R - [Getting started](#getting-started) - [Output format](#output-format) - [Adding metadata](#adding-metadata) +- [Runtime server](#runtime-server) ## How it works @@ -83,3 +84,34 @@ A typical function entry looks like this: ## Adding metadata Manual annotations (ownership rules, additional documentation, deprecation flags, etc.) live in `meta/meos-meta.json`. The merger applies them on top of the libclang-parsed structure when generating the final catalog. + +## Runtime server + +The same enriched catalog also drives a runtime HTTP server — the projection +that **executes** rather than just describes: + +```bash +python run.py # produce the enriched catalog +python serve.py # 127.0.0.1:8080 (StubEngine) +MEOS_LIBRARY_PATH=/path/libmeos.so python serve.py # real engine +``` + +Each *stateless-exposable* function is served as `POST /{function}`: +validate the JSON body, `decode` each serialized string to an opaque handle, +`invoke` the function, `encode` the result, reply `{"result": …}` (`204` +void, `400 {"error","code"}` on failure). All MEOS work sits behind a +pluggable `Engine`: `CtypesEngine` (`dlopen` a built `libmeos`, every opaque +value an anonymous `void *`) or `StubEngine` (no build needed; routing and +validation still work). + +Built from the live MobilityDB `master` catalog this is **1963 operations** +(91% of the public API; internal `meos_internal*.h` policy-excluded); +generation, routing, validation and dispatch are exercised end-to-end over +real HTTP (`tests/test_server.py`), and the full stack is validated against +an installed `/usr/local/lib/libmeos.so` — `POST /temporal_copy` → +`200 {"result":"{t@…, f@…}"}` (decode → invoke → `temporal_out(maxdd=15)`), +`floatset_value_n` n=2 → `2.5` via a byref out-parameter, a malformed body +→ `400` with the real MEOS message, and the server survives it +(`tests/test_engine_integration.py`, skipped unless `MEOS_LIBRARY_PATH` is +set). See [`docs/server.md`](docs/server.md). Stdlib only (`http.server`); +no new dependencies. diff --git a/docs/server.md b/docs/server.md new file mode 100644 index 0000000..63c0f8a --- /dev/null +++ b/docs/server.md @@ -0,0 +1,104 @@ +# Runtime server + +`server/` is the projection that **executes**. It builds its entire routing, +request-validation and dispatch table from the *enriched* catalog +(`network` / `wire` — see [`enrichment.md`](enrichment.md)), the same single +source the OpenAPI and MCP generators consume. + +```bash +python run.py # enriched catalog +python serve.py # serve on 127.0.0.1:8080 +MEOS_LIBRARY_PATH=/path/libmeos.so python serve.py 0.0.0.0 9000 +``` + +Per `POST /{function}` it runs the universal pipeline the `wire` model +implies: + +1. validate the JSON body against the parameter model; +2. `engine.decode` each serialized string → opaque handle; +3. `engine.invoke` the function with scalars + handles; +4. `engine.encode` an opaque result → string; +5. reply `{"result": …}` · `204` for void · `400 {"error","code"}` on a + MEOS/validation error · `404` for an unknown operation. + +`GET /healthz` reports engine and operation count. Stdlib `http.server` +only (no new dependencies) — a reference/embeddable server, not a tuned +production stack. + +## The engine seam + +All MEOS work is behind `server/engine.py`: + +| Engine | Use | +|---|---| +| `CtypesEngine` | **Real.** `dlopen`s a built `libmeos` and calls `x-meos.decode` / function / `x-meos.encode` by symbol. Every opaque value is an anonymous `void *` — no struct layout is ever needed, because the catalog already reduced every exposable function to *scalars + decode/encode of opaque pointers*. Selected when `MEOS_LIBRARY_PATH` is set. | +| `StubEngine` | No MEOS build: routes/validation/error-mapping run; MEOS calls return deterministic placeholders. Default, and what makes the server runnable/testable without a compiled MEOS. | + +## What is validated + +- **Generation, routing, validation, dispatch, error mapping** — built from + the live MobilityDB `master` catalog (**1963 operations** = 91% of the + public API; internal `meos_internal*.h` is policy-excluded; 0 malformed), + exercised end-to-end over real HTTP sockets with a recording engine + (`tests/test_server.py`). +- **Real `libmeos` end-to-end** — against an installed + `/usr/local/lib/libmeos.so` (`tests/test_engine_integration.py`, + skipped unless `MEOS_LIBRARY_PATH` is set): + + ``` + POST /temporal_copy {"temp":"{t@2000-01-01, f@2000-01-03, t@2000-01-05}"} + -> 200 {"result":"{t@2000-01-01 00:00:00+01, f@..., t@...}"} + # full path: decode(tbool_in) -> invoke -> encode(temporal_out, maxdd=15) + POST /temporal_num_instants {"temp":"{t@2000-01-01, f@2000-01-03, ...}"} + -> 200 {"result": 3} + POST /temporal_num_instants {"temp":"garbage"} + -> 400 {"error":"Missing delimeter character '@': garbage","code":22} + GET /healthz (after the bad request) -> 200 # process survived + + floatset_value_n(decode "{1.0, 2.5, 3.0}", n=2, double *result) + -> present=True, result=2.5 # scalar value via byref out-parameter + -> n=99: present=False # -> HTTP 204 (no value) + geoset_value_n(decode "{Point(1 1), Point(2 2)}", n=1, GSERIALIZED **result) + -> present=True, encode(geo_as_ewkt) -> "POINT(1 1)" # opaque out-param + temporal_merge_array(["t@2000-01-01","f@2000-01-03"]) # JSON list + -> decode each -> C array -> 200 {"result":"{t@..., f@...}"} + temporal_sequences("{[t@..., f@...], [t@...]}") # Elem **+count + -> byref count -> 200 {"result":["...","..."]} # JSON array + ``` + + The whole pipeline runs on real MEOS, including the **generic + `temporal_out`** with its `maxdd` aux defaulted — `test_engine_ + integration` round-trips a `tbool` *and* a `tfloat` through it, proving + it serialises any subtype. A malformed input becomes a `400` (the + installed non-fatal error handler) instead of `exit()`ing the server. +- `CtypesEngine` marshalling (including aux args) is additionally + unit-tested against a fake library. + +## Limitations / roadmap + +- **Polymorphic decoding is subtype-narrow (input side only).** Coverage + is **1963/2161 = 91% of the public API**: formatting aux args are + defaulted (generic `temporal_out`, `*_out(.., maxdd)`), scalar *and* + opaque out-parameter accessors are projected through their byref result, + input-array builders take a JSON list, array returns (`Elem **`+count) + become a JSON array, and the internal + `meos_internal*.h` programmer API is + policy-excluded. The remaining limitation: polymorphic types whose only + generic decoder needs a semantic type tag (`temporal_in(str, meosType)`, + the typed-set decoders) decode a serialized **argument** with a typed + helper (`tbool_in`, a `bigint`-set parser). A mismatched subtype yields a + clean MEOS `400`, never a crash or wrong answer. Carrying the subtype on + the wire for universal decoding is the remaining future work toward full + parity; the residual non-exposable set is otherwise genuinely + non-stateless (array/multi-out builders, `Datum`-internal, plumbing). +- Response is wrapped `{"result": …}` (matches the MCP `outputSchema` + envelope); an unwrapped mode matching the bare OpenAPI 200 schema is a + trivial follow-up. +- `CtypesEngine` integer width per function uses `c_long`; functions needing + exact `int32`/`size_t` widths may need per-function refinement (an + enrichment-side concern). +- Stdlib server is single-process; production deployment (ASGI/WSGI, + concurrency, auth) is intentionally out of scope — the value here is the + *correct-by-construction* contract execution, not the transport. +- Memory ownership: results are encoded then dropped; wiring MEOS + `pfree`/free of returned pointers into `CtypesEngine` is a follow-up. diff --git a/serve.py b/serve.py new file mode 100644 index 0000000..6e48a87 --- /dev/null +++ b/serve.py @@ -0,0 +1,48 @@ +# Run the contract-driven MEOS HTTP server. +# +# Usage: +# python run.py # produce the enriched catalog +# python serve.py # serve output/meos-idl.json on :8080 +# python serve.py catalog.json 0.0.0.0 9000 +# +# Engine: set MEOS_LIBRARY_PATH=/path/to/libmeos.so for the real ctypes +# engine; otherwise a non-computing StubEngine is used (routes/validation +# work, MEOS calls return placeholders). + +import json +import sys +from pathlib import Path + +from server.app import make_server +from server.engine import from_env + +IN_PATH = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("output/meos-idl.json") +HOST = sys.argv[2] if len(sys.argv) > 2 else "127.0.0.1" +PORT = int(sys.argv[3]) if len(sys.argv) > 3 else 8080 + + +def main() -> None: + if not IN_PATH.exists(): + sys.exit(f"Catalog not found: {IN_PATH} — run `python run.py` first.") + catalog = json.loads(IN_PATH.read_text()) + if not any("network" in f for f in catalog.get("functions", [])): + sys.exit(f"{IN_PATH} is not enriched (no `network` fields).") + + engine = from_env() + srv = make_server(catalog, engine, HOST, PORT) + n = sum(1 for f in catalog["functions"] + if f.get("network", {}).get("exposable")) + print(f"MEOS server on http://{HOST}:{PORT} " + f"({n} operations, engine={getattr(engine, 'name', '?')})", + file=sys.stderr) + try: + srv.serve_forever() + except KeyboardInterrupt: + pass + finally: + srv.server_close() + engine.close() + + +if __name__ == "__main__": + main() diff --git a/server/app.py b/server/app.py new file mode 100644 index 0000000..cc3732e --- /dev/null +++ b/server/app.py @@ -0,0 +1,248 @@ +"""Contract-driven runtime HTTP server. + +Builds its entire routing + request-validation + dispatch table from the +**enriched catalog** (`network` / `wire`), the same single source the +OpenAPI and MCP generators consume — the server is just another projection, +the one that *executes*. + +For every stateless-exposable function it exposes ``POST /{function}``: + +1. validate the JSON body against the `wire` parameter model; +2. ``engine.decode`` each serialized string into an opaque handle; +3. ``engine.invoke`` the function with the (scalars + handles); +4. ``engine.encode`` an opaque result back to a string; +5. reply ``{"result": …}`` (``204`` for void, ``400 {"error","code"}`` on a + MEOS/validation error, ``404`` unknown route). + +Stdlib only (`http.server`) — a reference/embeddable server, not a tuned +production stack. The MEOS work is entirely behind the ``Engine`` seam. +""" + +from __future__ import annotations + +import json +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +from server.engine import Engine, MeosError + +_JSON_PYTYPE = { + "integer": int, "number": (int, float), "boolean": bool, "string": str, +} +_RESULT_TAG = {"integer": "int", "number": "double", + "boolean": "bool", "string": "str"} + + +def build_enum_map(catalog: dict) -> dict: + return { + e["name"]: {v["name"]: v["value"] for v in e.get("values", [])} + for e in catalog.get("enums", []) + } + + +def build_routes(catalog: dict) -> dict: + """`/{fn}` → route descriptor, for every exposable function.""" + routes = {} + for fn in catalog.get("functions", []): + if not fn.get("network", {}).get("exposable"): + continue + w = fn["wire"] + params = [] + for p in w["params"]: + if p["kind"] == "array": + params.append({ + "name": p["name"], "kind": "array", + "count_param": p["count_param"], + "element": p["element"], + }) + else: + params.append({ + "name": p["name"], "kind": p["kind"], + "json": p.get("json"), "enum": p.get("enum"), + "decode": p.get("decode"), + "decode_aux": p.get("decode_aux", []), + "cType": p.get("cType"), + }) + r = w["result"] + routes["/" + fn["name"]] = { + "name": fn["name"], + "category": fn.get("category"), + "params": params, + "result": {"kind": r["kind"], "json": r.get("json"), + "encode": r.get("encode"), + "encode_aux": r.get("encode_aux", []), + "from_outparam": r.get("from_outparam"), + "out_ctype": r.get("out_ctype"), + "presence_return": r.get("presence_return", False), + "element": r.get("element"), + "count_outparam": r.get("count_outparam")}, + } + return routes + + +def _validate(body: dict, route: dict, enums: dict) -> None: + if not isinstance(body, dict): + raise ValueError("request body must be a JSON object") + names = {p["name"] for p in route["params"]} + extra = set(body) - names + if extra: + raise ValueError(f"unexpected field(s): {', '.join(sorted(extra))}") + for p in route["params"]: + if p["name"] not in body: + raise ValueError(f"missing required field: {p['name']}") + v = body[p["name"]] + if p["kind"] == "array": + if not isinstance(v, list) or not all( + isinstance(x, str) for x in v): + raise ValueError( + f"{p['name']} must be an array of strings") + elif p["kind"] == "serialized": + if not isinstance(v, str): + raise ValueError(f"{p['name']} must be a string") + elif p["enum"]: + if v not in enums.get(p["enum"], {}): + raise ValueError( + f"{p['name']} must be one of " + f"{sorted(enums.get(p['enum'], {}))}") + else: + ok = _JSON_PYTYPE.get(p["json"], object) + # bool is an int subclass — reject it for numeric fields. + if (p["json"] in ("integer", "number") and isinstance(v, bool)) \ + or not isinstance(v, ok): + raise ValueError( + f"{p['name']} must be of type {p['json']}") + + +_KIND_TAG = {"integer": "int", "number": "double", + "boolean": "bool", "string": "str"} + + +def _aux_args(specs: list) -> list: + """Catalog aux specs -> engine ``(tag, value)`` pairs (server defaults).""" + out = [] + for a in specs: + tag = _KIND_TAG.get(a["kind"], "str") + val = a["default"] + if tag == "bool": + val = 1 if val else 0 + out.append((tag, val)) + return out + + +def _dispatch(body: dict, route: dict, engine: Engine, enums: dict): + args = [] + for p in route["params"]: + v = body[p["name"]] + if p["kind"] == "array": + el = p["element"] + handles = [engine.decode(el["decode"], s, + _aux_args(el.get("decode_aux", []))) + for s in v] + args.append(("ptrarray", handles)) # Elem **arr + args.append(("int", len(handles))) # the implicit count + continue + if p["kind"] == "serialized": + args.append(("ptr", engine.decode( + p["decode"], v, _aux_args(p.get("decode_aux", []))))) + elif p["enum"]: + args.append(("enum", enums[p["enum"]][v])) + elif p["json"] == "integer": + args.append(("int", int(v))) + elif p["json"] == "number": + args.append(("double", float(v))) + elif p["json"] == "boolean": + args.append(("bool", 1 if v else 0)) + else: + args.append(("str", v)) + + res = route["result"] + if res.get("from_outparam"): + # bool f(.., T *result): the value comes back through the trailing + # out-parameter; the C return is a presence flag (void = always). + present, val = engine.invoke_outparam( + route["name"], args, res["out_ctype"], res["presence_return"]) + if not present: + return None + if res["kind"] == "serialized": # opaque out-param -> encode + return {"result": engine.encode( + res["encode"], val, _aux_args(res.get("encode_aux", [])))} + return {"result": bool(val) if res.get("json") == "boolean" else val} + + if res["kind"] == "array": + # Elem **f(.., int *count): MEOS returns a fresh array + byref count. + el = res["element"] + ptrs = engine.invoke_array(route["name"], args) + return {"result": [engine.encode(el["encode"], p, + _aux_args(el.get("encode_aux", []))) + for p in ptrs]} + + rtag = "void" if res["kind"] == "void" else ( + "ptr" if res["kind"] == "serialized" + else _RESULT_TAG.get(res["json"], "str")) + + out = engine.invoke(route["name"], args, rtag) + if res["kind"] == "void": + return None + if res["kind"] == "serialized": + return {"result": engine.encode( + res["encode"], out, _aux_args(res.get("encode_aux", [])))} + if res["kind"] == "json" and res["json"] == "boolean": + return {"result": bool(out)} + return {"result": out} + + +def make_handler(routes: dict, engine: Engine, enums: dict): + class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def log_message(self, *a): # silence default stderr spam + pass + + def _send(self, code: int, payload): + data = b"" if payload is None else json.dumps(payload).encode() + self.send_response(code) + if data: + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + if data: + self.wfile.write(data) + + def do_GET(self): + if self.path in ("/", "/healthz"): + return self._send(200, { + "service": "meos", "status": "ok", + "engine": getattr(engine, "name", "engine"), + "operations": len(routes), + }) + self._send(404, {"error": f"no such resource: {self.path}"}) + + def do_POST(self): + route = routes.get(self.path) + if route is None: + return self._send( + 404, {"error": f"no such operation: {self.path}"}) + try: + n = int(self.headers.get("Content-Length") or 0) + raw = self.rfile.read(n) if n else b"{}" + body = json.loads(raw or b"{}") + except (ValueError, json.JSONDecodeError): + return self._send(400, {"error": "invalid JSON body"}) + try: + _validate(body, route, enums) + result = _dispatch(body, route, engine, enums) + except MeosError as e: + return self._send(400, {"error": str(e), "code": e.code}) + except (ValueError, KeyError) as e: + return self._send(400, {"error": str(e)}) + if result is None: + return self._send(204, None) + self._send(200, result) + + return Handler + + +def make_server(catalog: dict, engine: Engine, host="127.0.0.1", port=8080): + routes = build_routes(catalog) + enums = build_enum_map(catalog) + handler = make_handler(routes, engine, enums) + return ThreadingHTTPServer((host, port), handler) diff --git a/server/engine.py b/server/engine.py new file mode 100644 index 0000000..1a2cb81 --- /dev/null +++ b/server/engine.py @@ -0,0 +1,259 @@ +"""MEOS execution engines. + +The runtime server is contract-driven: it builds routes/validation from the +enriched catalog and, per call, performs the universal pipeline the `wire` +model implies — *decode each serialized argument, invoke the function, encode +the result*. The actual MEOS calls happen behind the ``Engine`` seam. + +Two engines ship: + +- ``CtypesEngine`` — the real one: ``dlopen`` a built ``libmeos`` and call + ``x-meos.decode`` / the function / ``x-meos.encode`` by symbol. Requires a + compiled MEOS shared library (set ``MEOS_LIBRARY_PATH``); the marshalling + is unit-tested against a fake library but, by design, the literal native + linkage is environment-gated. +- ``StubEngine`` — no MEOS runtime: lets the server, routing, request + validation and error mapping run and be exercised without a build. It + returns deterministic placeholders and is **not** a computation engine. + +Engine contract (all tags are simple strings the app layer assigns from the +``wire`` model: ``ptr`` for a decoded opaque value, ``str``/``int``/ +``double``/``bool``/``enum`` for scalars, ``void`` for no result): + + decode(fn_name, value:str) -> handle (opaque) + invoke(fn_name, args:[(tag,val)], rt) -> handle|scalar|None + encode(fn_name, handle) -> str +""" + +from __future__ import annotations + + +class MeosError(Exception): + """A MEOS-level failure; mapped by the server to HTTP 400.""" + + def __init__(self, message: str, code: int = 1): + super().__init__(message) + self.code = code + + +class Engine: + # ``aux`` is a list of ``(tag, value)`` for the trailing formatting args + # of the in/out wrapper (e.g. ``[("int", 15)]`` for ``temporal_out``'s + # ``maxdd``), supplied from the catalog's ``decode_aux``/``encode_aux``. + def decode(self, fn_name: str, value: str, aux=()): + raise NotImplementedError + + def invoke(self, fn_name: str, args: list, result_tag: str): + raise NotImplementedError + + def encode(self, fn_name: str, handle, aux=()) -> str: + raise NotImplementedError + + def invoke_outparam(self, fn_name: str, args: list, out_ctype: str, + presence: bool): + """``bool f(.., T *result)`` — return ``(present, value)``.""" + raise NotImplementedError + + def invoke_array(self, fn_name: str, args: list): + """``Elem **f(.., int *count)`` — return a list of element handles.""" + raise NotImplementedError + + def close(self) -> None: + pass + + +class StubEngine(Engine): + """Non-computing placeholder so the server runs without a MEOS build.""" + + name = "stub" + + def decode(self, fn_name, value, aux=()): + return {"_stub_handle": fn_name, "value": value} + + def invoke(self, fn_name, args, result_tag): + if result_tag == "void": + return None + if result_tag == "ptr": + return {"_stub_handle": fn_name} + return {"str": "", "int": 0, "double": 0.0, + "bool": False, "enum": ""}.get(result_tag, "") + + def encode(self, fn_name, handle, aux=()): + return f"" + + def invoke_outparam(self, fn_name, args, out_ctype, presence): + return True, 0 + + def invoke_array(self, fn_name, args): + return [] + + +class CtypesEngine(Engine): + """Calls a built libmeos via ctypes, driven entirely by the wire model. + + Every opaque value is an anonymous ``void *`` — no struct layout is ever + needed, because the catalog already reduced every exposable function to + *scalars + decode/encode of opaque pointers*. + """ + + name = "ctypes" + + def __init__(self, library_path: str): + import ctypes + + self._ct = ctypes + self.lib = ctypes.CDLL(library_path) + self._argmap = { + "ptr": ctypes.c_void_p, "str": ctypes.c_char_p, + "int": ctypes.c_long, "double": ctypes.c_double, + "bool": ctypes.c_int, "enum": ctypes.c_int, + } + self._retmap = { + "ptr": ctypes.c_void_p, "str": ctypes.c_char_p, + "int": ctypes.c_long, "double": ctypes.c_double, + "bool": ctypes.c_int, + } + self._last_error = None + + if hasattr(self.lib, "meos_initialize"): + self.lib.meos_initialize.restype = None + try: + self.lib.meos_initialize() + except TypeError: + self.lib.meos_initialize(None) + + # MEOS's default error handler calls exit() — a single bad input + # would kill the whole server. Replace it with one that records the + # error so the request can be turned into a 400 instead. + if hasattr(self.lib, "meos_initialize_error_handler"): + handler_t = ctypes.CFUNCTYPE( + None, ctypes.c_int, ctypes.c_int, ctypes.c_char_p) + + def _on_error(level, code, msg): + self._last_error = ( + int(code), + msg.decode(errors="replace") if msg else "MEOS error") + + self._err_cb = handler_t(_on_error) # keep a ref alive + self.lib.meos_initialize_error_handler.argtypes = [handler_t] + self.lib.meos_initialize_error_handler.restype = None + self.lib.meos_initialize_error_handler(self._err_cb) + + def _sym(self, name: str): + try: + return getattr(self.lib, name) + except AttributeError as e: + raise MeosError(f"unknown MEOS symbol: {name}", 404) from e + + def _raise_if_error(self) -> None: + if self._last_error is not None: + code, msg = self._last_error + self._last_error = None + raise MeosError(msg, code) + + def _aux_ctypes(self, aux): + types = [self._argmap[t] for t, _ in aux] + vals = [v.encode() if t == "str" and isinstance(v, str) else v + for t, v in aux] + return types, vals + + def decode(self, fn_name, value, aux=()): + self._last_error = None + f = self._sym(fn_name) + atypes, avals = self._aux_ctypes(aux) + f.argtypes = [self._ct.c_char_p, *atypes] + f.restype = self._ct.c_void_p + h = f(value.encode(), *avals) + self._raise_if_error() + if not h: + raise MeosError(f"{fn_name} failed to parse input") + return h + + def invoke(self, fn_name, args, result_tag): + self._last_error = None + f = self._sym(fn_name) + ct = self._ct + argtypes, cargs = [], [] + for tag, val in args: + if tag == "ptrarray": # Elem **arr from a JSON list + arr = (ct.c_void_p * len(val))(*[ct.c_void_p(h) + for h in val]) + argtypes.append(ct.POINTER(ct.c_void_p)) + cargs.append(arr) + else: + argtypes.append(self._argmap[tag]) + cargs.append(val.encode() if tag == "str" + and isinstance(val, str) else val) + f.argtypes = argtypes + f.restype = self._retmap.get(result_tag) # None == void + r = f(*cargs) + self._raise_if_error() + return r + + def encode(self, fn_name, handle, aux=()): + self._last_error = None + f = self._sym(fn_name) + atypes, avals = self._aux_ctypes(aux) + f.argtypes = [self._ct.c_void_p, *atypes] + f.restype = self._ct.c_char_p + r = f(handle, *avals) + self._raise_if_error() + return r.decode() if r else None + + def _pointee_ctype(self, out_ctype: str): + ct = self._ct + base = " ".join( + out_ctype.replace("const", "").replace("*", " ").split()) + return { + "double": ct.c_double, "float": ct.c_float, + "long long": ct.c_longlong, "unsigned long": ct.c_ulong, + "long": ct.c_long, "unsigned int": ct.c_uint, + "short": ct.c_short, "unsigned char": ct.c_ubyte, + "signed char": ct.c_byte, + }.get(base, ct.c_int) + + def invoke_outparam(self, fn_name, args, out_ctype, presence): + self._last_error = None + f = self._sym(fn_name) + ct = self._ct + # T **result -> the slot holds an opaque pointer (c_void_p); + # T *result -> a scalar slot. + slot = (ct.c_void_p() if out_ctype.count("*") >= 2 + else self._pointee_ctype(out_ctype)()) + f.argtypes = [self._argmap[t] for t, _ in args] + [ct.POINTER( + type(slot))] + f.restype = ct.c_int if presence else None + cargs = [v.encode() if t == "str" and isinstance(v, str) else v + for t, v in args] + ret = f(*cargs, ct.byref(slot)) + self._raise_if_error() + present = bool(ret) if presence else True + return present, slot.value + + def invoke_array(self, fn_name, args): + # Elem **f(leading..., int *count): MEOS allocates the array and + # writes the length through the trailing byref count. + self._last_error = None + f = self._sym(fn_name) + ct = self._ct + n = ct.c_int() + f.argtypes = [self._argmap[t] for t, _ in args] + [ + ct.POINTER(ct.c_int)] + f.restype = ct.c_void_p + cargs = [v.encode() if t == "str" and isinstance(v, str) else v + for t, v in args] + ret = f(*cargs, ct.byref(n)) + self._raise_if_error() + count = n.value + if not ret or count <= 0: + return [] + arr = ct.cast(ret, ct.POINTER(ct.c_void_p)) + return [arr[i] for i in range(count)] + + +def from_env() -> Engine: + """``CtypesEngine`` if ``MEOS_LIBRARY_PATH`` is set, else ``StubEngine``.""" + import os + + path = os.environ.get("MEOS_LIBRARY_PATH") + return CtypesEngine(path) if path else StubEngine() diff --git a/tests/test_engine_integration.py b/tests/test_engine_integration.py new file mode 100644 index 0000000..249db5d --- /dev/null +++ b/tests/test_engine_integration.py @@ -0,0 +1,131 @@ +"""End-to-end integration test against a *built* libmeos. + +Skipped unless ``MEOS_LIBRARY_PATH`` points at a loadable MEOS shared +library — so CI without a MEOS build still passes. Run it with: + + MEOS_LIBRARY_PATH=/usr/local/lib/libmeos.so python3 tests/test_engine_integration.py + +It drives the exact path the server uses, including the catalog's +``in_aux``/``out_aux`` defaults (so the *generic* ``temporal_out(temp, +maxdd=15)`` is called correctly — proving it serialises any subtype), and +asserts that bad input raises ``MeosError`` instead of terminating the +process (MEOS's default handler calls ``exit()``). +""" + +import json +import os +import sys +import unittest +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from server.engine import CtypesEngine, MeosError + +_LIB = os.environ.get("MEOS_LIBRARY_PATH") +_HAVE = bool(_LIB) and Path(_LIB).exists() +_CATALOG = Path(__file__).resolve().parents[1] / "output" / "meos-idl.json" +_TBOOL = "{t@2000-01-01, f@2000-01-03, t@2000-01-05}" +_TFLOAT = "{1.5@2000-01-01, 3.5@2000-01-03}" + +_KIND_TAG = {"integer": "int", "number": "double", + "boolean": "bool", "string": "str"} + + +def _aux(specs): + return [(_KIND_TAG.get(a["kind"], "str"), a["default"]) for a in specs] + + +@unittest.skipUnless(_HAVE, "set MEOS_LIBRARY_PATH to a built libmeos.so") +class CtypesIntegrationTests(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.eng = CtypesEngine(_LIB) + te = (json.loads(_CATALOG.read_text()).get("typeEncodings", {}) + if _CATALOG.exists() else {}) + t = te.get("Temporal", {}) + cls.tin = t.get("in", "tbool_in") + cls.tout = t.get("out", "tbool_out") + cls.in_aux = _aux(t.get("in_aux", [])) + cls.out_aux = _aux(t.get("out_aux", [])) + + def test_catalog_selected_in_out(self): + # Decoding stays a typed wrapper (subtype-narrow); encoding is the + # generic temporal_out with a defaulted maxdd. + self.assertEqual(self.tin, "tbool_in") + self.assertEqual(self.tout, "temporal_out") + self.assertEqual(self.out_aux, [("int", 15)]) + + def test_decode_invoke_scalar(self): + h = self.eng.decode(self.tin, _TBOOL, self.in_aux) + self.assertTrue(h) + n = self.eng.invoke("temporal_num_instants", [("ptr", h)], "int") + self.assertEqual(n, 3) + + def test_generic_encoder_round_trips_any_subtype(self): + # The whole point of the gap fix: temporal_out(+maxdd) serialises + # a tbool AND a tfloat — a subtype-narrow tbool_out could not. + hb = self.eng.decode("tbool_in", _TBOOL) + ob = self.eng.encode(self.tout, hb, self.out_aux) + self.assertIn("@", ob) + hf = self.eng.decode("tfloat_in", _TFLOAT) + of = self.eng.encode(self.tout, hf, self.out_aux) + self.assertIn("@", of) + self.assertIn("1.5", of) + + def test_scalar_outparam_round_trip(self): + # bool floatset_value_n(const Set *, int n, double *result): + # the value comes back through the byref out-parameter. + h = self.eng.decode("floatset_in", "{1.0, 2.5, 3.0}") + # MEOS *_value_n is 1-based: n=2 -> the second element. + present, val = self.eng.invoke_outparam( + "floatset_value_n", [("ptr", h), ("int", 2)], "double *", True) + self.assertTrue(present) + self.assertAlmostEqual(val, 2.5, places=6) + # out-of-range index -> presence False, no value + present2, _ = self.eng.invoke_outparam( + "floatset_value_n", [("ptr", h), ("int", 99)], "double *", True) + self.assertFalse(present2) + + def test_opaque_outparam_round_trip(self): + # bool geoset_value_n(const Set *, int n, GSERIALIZED **result): + # the opaque pointer comes back via byref and is then encoded. + h = self.eng.decode("geomset_in", "{Point(1 1), Point(2 2)}") + present, ptr = self.eng.invoke_outparam( + "geoset_value_n", [("ptr", h), ("int", 1)], "GSERIALIZED **", + True) + self.assertTrue(present) + self.assertTrue(ptr) + self.assertIn("POINT", self.eng.encode("geo_as_ewkt", ptr).upper()) + + def test_input_array_builder_round_trip(self): + # Temporal *temporal_merge_array(Temporal **temparr, int count): + # a JSON list -> decoded element handles -> C array. + h1 = self.eng.decode("tbool_in", "t@2000-01-01") + h2 = self.eng.decode("tbool_in", "f@2000-01-03") + merged = self.eng.invoke( + "temporal_merge_array", + [("ptrarray", [h1, h2]), ("int", 2)], "ptr") + self.assertTrue(merged) + out = self.eng.encode(self.tout, merged, self.out_aux) + self.assertIn("@", out) + self.assertIn("2000-01-03", out) # both instants merged in + + def test_array_return_round_trip(self): + # TSequence **temporal_sequences(const Temporal *, int *count): + # MEOS allocates the array; engine returns the element handles. + h = self.eng.decode( + "tbool_in", "{[t@2000-01-01, f@2000-01-03], [t@2000-01-05]}") + ptrs = self.eng.invoke_array("temporal_sequences", [("ptr", h)]) + self.assertEqual(len(ptrs), 2) # two composing sequences + outs = [self.eng.encode("tsequence_out", p, [("int", 15)]) + for p in ptrs] + self.assertTrue(all("@" in o for o in outs)) + + def test_bad_input_raises_not_exits(self): + with self.assertRaises(MeosError): + self.eng.decode("tbool_in", "not a temporal value at all") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..5fa9333 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,408 @@ +"""Unit tests for the runtime server. + +Runs without libclang, libmeos or pytest: python3 tests/test_server.py + +A real threaded HTTP server is exercised over a socket with a FakeEngine, +so routing / validation / dispatch / error mapping are end-to-end tested. +CtypesEngine marshalling is tested against a fake shared library. +""" + +import json +import sys +import threading +import unittest +from http.client import HTTPConnection +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from server.app import make_server, build_routes +from server.engine import Engine, MeosError, CtypesEngine + +TEMP = "const struct Temporal *" + + +def serialized(name, decode="temporal_in"): + return {"name": name, "kind": "serialized", "cType": TEMP, + "decode": decode, "encodings": ["text"]} + + +CATALOG = { + "functions": [ + {"name": "temporal_eq", "category": "predicate", + "network": {"exposable": True}, + "wire": {"params": [serialized("temp1"), serialized("temp2")], + "result": {"kind": "json", "json": "integer"}}}, + {"name": "temporal_set_interp", "category": "transformation", + "network": {"exposable": True}, + "wire": {"params": [serialized("temp"), + {"name": "interp", "kind": "json", + "json": "string", "enum": "interpType"}], + "result": {"kind": "serialized", + "cType": "struct Temporal *", + "encode": "temporal_out", + "encode_aux": [{"name": "maxdd", + "kind": "integer", + "default": 15}]}}}, + {"name": "temporal_round", "category": "transformation", + "network": {"exposable": True}, + "wire": {"params": [serialized("temp"), + {"name": "maxdd", "kind": "json", + "json": "integer"}], + "result": {"kind": "serialized", + "cType": "struct Temporal *", + "encode": "temporal_out"}}}, + {"name": "noop_op", "category": "transformation", + "network": {"exposable": True}, + "wire": {"params": [], "result": {"kind": "void"}}}, + {"name": "tsequence_make", "category": "constructor", + "network": {"exposable": False}, + "wire": {"params": [], "result": {"kind": "unsupported"}}}, + {"name": "floatset_value_n", "category": "accessor", + "network": {"exposable": True}, + "wire": {"params": [serialized("s", "set_in"), + {"name": "n", "kind": "json", + "json": "integer"}], + "result": {"kind": "json", "json": "number", + "from_outparam": "result", + "out_ctype": "double *", + "presence_return": True}}}, + {"name": "geoset_value_n", "category": "accessor", + "network": {"exposable": True}, + "wire": {"params": [serialized("s", "geoset_in"), + {"name": "n", "kind": "json", + "json": "integer"}], + "result": {"kind": "serialized", "cType": "GSERIALIZED **", + "encode": "geo_as_ewkt", "encode_aux": [], + "from_outparam": "result", + "out_ctype": "GSERIALIZED **", + "presence_return": True}}}, + {"name": "temporal_merge_array", "category": "transformation", + "network": {"exposable": True}, + "wire": {"params": [ + {"name": "temparr", "kind": "array", "count_param": "count", + "element": {"kind": "serialized", + "cType": "struct Temporal *", + "decode": "temporal_in", "decode_aux": [], + "encodings": ["text"]}}], + "result": {"kind": "serialized", + "cType": "struct Temporal *", + "encode": "temporal_out", "encode_aux": []}}}, + {"name": "temporal_sequences", "category": "accessor", + "network": {"exposable": True}, + "wire": {"params": [serialized("temp", "temporal_in")], + "result": {"kind": "array", "count_outparam": "count", + "element": {"kind": "serialized", + "cType": "struct TSequence *", + "encode": "tsequence_out", + "encode_aux": [], + "encodings": ["text"]}}}}, + ], + "enums": [{"name": "interpType", + "values": [{"name": "STEP", "value": 0}, + {"name": "LINEAR", "value": 1}]}], +} + + +class FakeEngine(Engine): + name = "fake" + + def __init__(self): + self.calls = [] + self.fail = False + + def decode(self, fn, value, aux=()): + self.calls.append(("decode", fn, value, list(aux))) + return ("H", fn, value) + + def invoke(self, fn, args, rt): + self.calls.append(("invoke", fn, args, rt)) + if self.fail: + raise MeosError("boom", 7) + return {"void": None, "ptr": ("R", fn), "int": 42, + "bool": 1, "double": 1.5, "str": "s"}[rt] + + def encode(self, fn, handle, aux=()): + self.calls.append(("encode", fn, handle, list(aux))) + return f"ENC({fn})" + + def invoke_outparam(self, fn, args, out_ctype, presence): + self.calls.append(("invoke_outparam", fn, args, out_ctype, presence)) + if self.fail: + raise MeosError("boom", 7) + return (self.present, 99) + + def invoke_array(self, fn, args): + self.calls.append(("invoke_array", fn, args)) + if self.fail: + raise MeosError("boom", 7) + return [101, 102] + + present = True + + +class ServerTests(unittest.TestCase): + def setUp(self): + self.engine = FakeEngine() + self.srv = make_server(CATALOG, self.engine, "127.0.0.1", 0) + self.port = self.srv.server_address[1] + threading.Thread(target=self.srv.serve_forever, daemon=True).start() + + def tearDown(self): + self.srv.shutdown() + self.srv.server_close() + + def req(self, method, path, body=None): + c = HTTPConnection("127.0.0.1", self.port, timeout=5) + data = None if body is None else json.dumps(body) + c.request(method, path, data, + {"Content-Type": "application/json"} if data else {}) + r = c.getresponse() + raw = r.read() + c.close() + return r.status, (json.loads(raw) if raw else None) + + # --- routing / health --- + def test_routes_exclude_non_exposable(self): + routes = build_routes(CATALOG) + self.assertIn("/temporal_eq", routes) + self.assertNotIn("/tsequence_make", routes) + self.assertEqual(len(routes), 8) + + def test_health(self): + st, body = self.req("GET", "/healthz") + self.assertEqual(st, 200) + self.assertEqual(body["engine"], "fake") + self.assertEqual(body["operations"], 8) + + def test_unknown_route(self): + self.assertEqual(self.req("POST", "/nope", {})[0], 404) + self.assertEqual(self.req("GET", "/nope")[0], 404) + + # --- happy paths: full decode/invoke/encode pipeline --- + def test_predicate_pipeline(self): + st, body = self.req("POST", "/temporal_eq", + {"temp1": "Point(1 1)@...", + "temp2": "Point(2 2)@..."}) + self.assertEqual(st, 200) + self.assertEqual(body, {"result": 42}) + kinds = [c[0] for c in self.engine.calls] + self.assertEqual(kinds, ["decode", "decode", "invoke"]) + inv = self.engine.calls[-1] + self.assertEqual(inv[1], "temporal_eq") + self.assertEqual([t for t, _ in inv[2]], ["ptr", "ptr"]) + self.assertEqual(inv[3], "int") + + def test_enum_and_serialized_result(self): + st, body = self.req("POST", "/temporal_set_interp", + {"temp": "x", "interp": "LINEAR"}) + self.assertEqual(st, 200) + self.assertEqual(body, {"result": "ENC(temporal_out)"}) + inv = next(c for c in self.engine.calls if c[0] == "invoke") + self.assertEqual(inv[2], [("ptr", ("H", "temporal_in", "x")), + ("enum", 1)]) # LINEAR -> 1 + self.assertEqual(inv[3], "ptr") + # encode_aux (maxdd=15) is passed through to engine.encode + enc = next(c for c in self.engine.calls if c[0] == "encode") + self.assertEqual(enc[3], [("int", 15)]) + + def test_void(self): + st, body = self.req("POST", "/noop_op", {}) + self.assertEqual(st, 204) + self.assertIsNone(body) + + def test_scalar_outparam(self): + st, body = self.req("POST", "/floatset_value_n", {"s": "x", "n": 0}) + self.assertEqual(st, 200) + self.assertEqual(body, {"result": 99}) # from the out-param slot + c = next(c for c in self.engine.calls if c[0] == "invoke_outparam") + self.assertEqual(c[1], "floatset_value_n") + self.assertEqual(c[2], [("ptr", ("H", "set_in", "x")), ("int", 0)]) + self.assertEqual(c[3], "double *") + self.assertTrue(c[4]) # presence_return + # absent value -> no result -> 204 + self.engine.present = False + st2, body2 = self.req("POST", "/floatset_value_n", {"s": "y", "n": 9}) + self.assertEqual(st2, 204) + self.assertIsNone(body2) + self.engine.present = True + + def test_opaque_outparam(self): + st, body = self.req("POST", "/geoset_value_n", {"s": "x", "n": 1}) + self.assertEqual(st, 200) + # opaque out-param pointer is encoded via the type's encoder + self.assertEqual(body, {"result": "ENC(geo_as_ewkt)"}) + c = next(c for c in self.engine.calls if c[0] == "invoke_outparam") + self.assertEqual(c[3], "GSERIALIZED **") + enc = next(c for c in self.engine.calls if c[0] == "encode") + self.assertEqual(enc[1], "geo_as_ewkt") + self.assertEqual(enc[2], 99) # the out-param pointer + self.engine.present = False + st2, body2 = self.req("POST", "/geoset_value_n", {"s": "y", "n": 9}) + self.assertEqual(st2, 204) + self.engine.present = True + + def test_input_array_builder(self): + st, body = self.req("POST", "/temporal_merge_array", + {"temparr": ["a", "b", "c"]}) + self.assertEqual(st, 200) + self.assertEqual(body, {"result": "ENC(temporal_out)"}) + decs = [c for c in self.engine.calls if c[0] == "decode"] + self.assertEqual([c[1] for c in decs], + ["temporal_in"] * 3) # each element decoded + inv = next(c for c in self.engine.calls if c[0] == "invoke") + tags = [t for t, _ in inv[2]] + self.assertEqual(tags, ["ptrarray", "int"]) # array then count + self.assertEqual(inv[2][1], ("int", 3)) # implicit count = len + # validation: must be a list of strings + self.assertEqual(self.req("POST", "/temporal_merge_array", + {"temparr": "notlist"})[0], 400) + self.assertEqual(self.req("POST", "/temporal_merge_array", + {"temparr": [1, 2]})[0], 400) + + def test_array_return(self): + st, body = self.req("POST", "/temporal_sequences", {"temp": "x"}) + self.assertEqual(st, 200) + # each element pointer is encoded -> a JSON array + self.assertEqual(body, {"result": ["ENC(tsequence_out)", + "ENC(tsequence_out)"]}) + ia = next(c for c in self.engine.calls if c[0] == "invoke_array") + self.assertEqual(ia[1], "temporal_sequences") + self.assertEqual(ia[2], [("ptr", ("H", "temporal_in", "x"))]) + encs = [c for c in self.engine.calls if c[0] == "encode"] + self.assertEqual([c[1] for c in encs], ["tsequence_out"] * 2) + + # --- validation --- + def test_missing_field(self): + st, body = self.req("POST", "/temporal_eq", {"temp1": "a"}) + self.assertEqual(st, 400) + self.assertIn("missing required field: temp2", body["error"]) + + def test_unexpected_field(self): + st, body = self.req("POST", "/noop_op", {"x": 1}) + self.assertEqual(st, 400) + self.assertIn("unexpected field", body["error"]) + + def test_type_and_bool_rejection(self): + st, b = self.req("POST", "/temporal_round", + {"temp": "x", "maxdd": "3"}) + self.assertEqual(st, 400) + self.assertIn("maxdd must be of type integer", b["error"]) + st, _ = self.req("POST", "/temporal_round", + {"temp": "x", "maxdd": True}) # bool != integer + self.assertEqual(st, 400) + + def test_bad_enum_value(self): + st, b = self.req("POST", "/temporal_set_interp", + {"temp": "x", "interp": "NOPE"}) + self.assertEqual(st, 400) + self.assertIn("must be one of", b["error"]) + + def test_invalid_json(self): + c = HTTPConnection("127.0.0.1", self.port, timeout=5) + c.request("POST", "/noop_op", "{not json", + {"Content-Type": "application/json"}) + r = c.getresponse() + self.assertEqual(r.status, 400) + c.close() + + def test_meos_error_envelope(self): + self.engine.fail = True + st, body = self.req("POST", "/temporal_eq", + {"temp1": "a", "temp2": "b"}) + self.assertEqual(st, 400) + self.assertEqual(body, {"error": "boom", "code": 7}) + + +class FakeFunc: + def __init__(self, ret): + self._ret = ret + self.argtypes = None + self.restype = "unset" + self.called_with = None + + def __call__(self, *a): + self.called_with = a + return self._ret + + +class FakeLib: + def __init__(self): + self.funcs = {"d": FakeFunc(0xABCD), "f": FakeFunc(0x1234), + "e": FakeFunc(b"WKT-OUT")} + + def __getattr__(self, name): + funcs = self.__dict__.get("funcs", {}) + if name not in funcs: + raise AttributeError(name) + return funcs[name] + + +class FakeCtypes: + c_void_p = "void_p" + c_char_p = "char_p" + c_long = "long" + c_double = "double" + c_int = "int" + + +class CtypesEngineTests(unittest.TestCase): + def _engine(self): + e = object.__new__(CtypesEngine) + e._ct = FakeCtypes + e.lib = FakeLib() + e._argmap = {"ptr": "void_p", "str": "char_p", "int": "long", + "double": "double", "bool": "int", "enum": "int"} + e._retmap = {"ptr": "void_p", "str": "char_p", "int": "long", + "double": "double", "bool": "int"} + e._last_error = None + return e + + def test_decode_sets_types_and_encodes(self): + e = self._engine() + h = e.decode("d", "WKT") + f = e.lib.funcs["d"] + self.assertEqual(f.argtypes, ["char_p"]) + self.assertEqual(f.restype, "void_p") + self.assertEqual(f.called_with, (b"WKT",)) + self.assertEqual(h, 0xABCD) + + def test_invoke_maps_arg_and_result_tags(self): + e = self._engine() + out = e.invoke("f", [("ptr", 0xABCD), ("int", 5)], "ptr") + f = e.lib.funcs["f"] + self.assertEqual(f.argtypes, ["void_p", "long"]) + self.assertEqual(f.restype, "void_p") + self.assertEqual(f.called_with, (0xABCD, 5)) + self.assertEqual(out, 0x1234) + + def test_invoke_void_restype_none(self): + e = self._engine() + e.invoke("f", [], "void") + self.assertIsNone(e.lib.funcs["f"].restype) + + def test_encode_decodes_bytes(self): + e = self._engine() + self.assertEqual(e.encode("e", 0xABCD), "WKT-OUT") + + def test_aux_args_appended_to_signature(self): + # decode(str, +aux) and encode(handle, +aux): the trailing + # formatting scalars (e.g. maxdd) extend argtypes and the call. + e = self._engine() + e.decode("d", "WKT", aux=[("int", 15)]) + d = e.lib.funcs["d"] + self.assertEqual(d.argtypes, ["char_p", "long"]) + self.assertEqual(d.called_with, (b"WKT", 15)) + e.encode("e", 0xABCD, aux=[("int", 15)]) + f = e.lib.funcs["e"] + self.assertEqual(f.argtypes, ["void_p", "long"]) + self.assertEqual(f.called_with, (0xABCD, 15)) + + def test_unknown_symbol_raises_meos_error(self): + e = self._engine() + with self.assertRaises(MeosError): + e.decode("missing", "x") + + +if __name__ == "__main__": + unittest.main(verbosity=2)