Skip to content

Add @Flow.model functional API#232

Open
timkpaine wants to merge 22 commits into
mainfrom
nk/auto_deps_auto_callable_model
Open

Add @Flow.model functional API#232
timkpaine wants to merge 22 commits into
mainfrom
nk/auto_deps_auto_callable_model

Conversation

@timkpaine

Copy link
Copy Markdown
Member

PR Summary: @Flow.model

Branch: nk/auto_deps_auto_callable_model

Replaces #171. Reopened from a personal fork.

This PR adds @Flow.model, an authoring API that turns a typed Python function
into a real CallableModel factory. The goal is to make common DAG stages
easier to write while keeping execution inside existing ccflow machinery:
CallableModel, evaluators, caches, dependency graphs, registry/Hydra loading,
and serialization.

Core API

@Flow.model splits function parameters into two categories:

  • Regular parameters: ordinary unmarked parameters. These are construction-time
    model inputs and may be literals, defaults, or direct upstream
    CallableModel dependencies.
  • Contextual parameters: parameters marked with FromContext[T]. These are
    runtime inputs supplied by context, .flow.compute(...), construction-time
    contextual defaults, or .flow.with_context(...).
from ccflow import Flow, FromContext


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


model = add(a=10)
assert model.flow.compute(b=5).value == 15

When a function returns a non-ResultBase value, the generated model wraps it in
GenericResult. Explicit ResultBase returns are preserved.

Dependency Wiring

Regular parameters can be bound directly to upstream models:

@Flow.model
def source(value: FromContext[int]) -> int:
    return value * 10


@Flow.model
def root(x: int, bonus: FromContext[int]) -> int:
    return x + bonus


model = root(x=source())
assert model.flow.compute(value=3, bonus=7).value == 37

Only direct regular-parameter values are treated as upstream dependencies in
this first version. Containers such as list, tuple, dict, and set are
ordinary literal values; @Flow.model does not scan them for nested model
dependencies.

Generated __deps__ methods expose non-lazy direct upstream dependencies to the
existing graph evaluator. Lazy[T] is supported for direct dependency thunks
when a dependency should only be evaluated if user code calls it.

Context Rewrites

This PR adds .flow.with_context(...) plus @Flow.context_transform.

with_context(...) rewrites runtime context for one dependency edge without
mutating the wrapped model. This supports fanout patterns where the same model is
evaluated against different contextual inputs in different branches.

@Flow.context_transform
def previous_day(day: FromContext[int]) -> int:
    return day - 1


previous = source().flow.with_context(value=previous_day())

Context bindings are stored as one ordered operation stream. Chained
with_context(...) calls preserve write order. Context transforms read from the
original ambient context, not from values written by earlier bindings in the
chain. Earlier field bindings overwritten by later field bindings do not run or
require inputs; patch transforms remain conservative because their output keys
can be dynamic.

Positional with_context(...) arguments must be bound @Flow.context_transform
results that return mappings. Keyword field bindings may be static values or
bound field transforms. Callable keyword values are allowed when the target
contextual field type validates them, for example FromContext[Callable[..., T]].

Execution And Introspection Helpers

Every CallableModel now exposes model.flow.

The public .flow surface is intentionally small:

  • compute(...): ergonomic execution from a context object or contextual kwargs.
  • with_context(...): edge-local context rewrites.
  • inspect(...): structured debugging and introspection.

inspect(...) returns a FlowInspection object:

inspection = model.flow.inspect()

inspection.inputs
inspection.context_inputs
inspection.runtime_inputs
inspection.required_inputs
inspection.bound_inputs
inspection.dependencies

The top-level inspection fields are current-level only. They describe the model
or wrapper being inspected, not a flattened view of the whole dependency graph.

  • inputs: a dict from function input name to InputSpec, including type,
    required/default/value/source information.
  • context_inputs: declared contextual contract for the model or wrapped model.
  • runtime_inputs: direct runtime inputs the current model or wrapper may read
    after its own bindings.
  • required_inputs: required direct runtime inputs still unsatisfied by
    defaults or bindings.
  • bound_inputs: concrete values already fixed on the current model or wrapper.
  • dependencies: dependency edges discovered from direct generated-model
    regular inputs.

context_inputs intentionally remains faithful to the declared model contract.
For bound wrappers, use runtime_inputs, required_inputs, bound_inputs, and
inputs on the inspection object to understand the effective caller-facing
context after bindings.

@Flow.model
def add(a: int, b: FromContext[int], c: FromContext[int] = 5) -> int:
    return a + b + c


@Flow.context_transform
def from_seed(seed: FromContext[int]) -> int:
    return seed + 1


bound = add(a=10).flow.with_context(b=from_seed())
inspection = bound.flow.inspect()

assert inspection.context_inputs == {"b": int, "c": int}
assert inspection.runtime_inputs == {"c": int, "seed": int}
assert inspection.required_inputs == {"seed": int}
assert inspection.bound_inputs == {"a": 10}

inspect(...) can also take a proposed context object or contextual kwargs.
Those values are used structurally: known direct inputs get values, and
dependency edges get projected context. inspect(...) does not validate unused
runtime fields, does not report missing runtime fields as a separate check
object, and does not try to flatten graph-wide requirements. A strict debug-time
input checker is intentionally deferred until current-model versus graph-wide
semantics are explicit.

Dependency depth is controlled by one option:

model.flow.inspect(dependencies="direct")     # default
model.flow.inspect(dependencies="recursive")  # generated-model dependency tree
model.flow.inspect(dependencies="none")       # skip dependency inspection

dependencies="direct" lists immediate dependency edges. dependencies="none"
leaves dependencies empty. dependencies="recursive" follows inspect-visible
dependencies from constructed @Flow.model inputs and with_context(...)
wrappers.

This is not a full evaluator graph browser. A handwritten CallableModel can
appear as a dependency target when it is bound to an @Flow.model regular input,
but inspect(...) does not expand that handwritten model's custom
CallableModel.__deps__ implementation. That broader graph introspection is a
follow-on feature.

compute() deliberately does not bind regular parameters. If a kwarg matches a
regular parameter or model configuration field, it raises instead of silently
treating runtime context as model construction input.

Flow.call(auto_context=...)

The PR also adds Flow.call(auto_context=...) as a narrow opt-in for handwritten
CallableModel.__call__ methods that want to declare context fields as
keyword-only parameters.

This is not the main @Flow.model authoring path. It does not add
FromContext[...], dependency wiring, generated factories, or
.flow.with_context(...) semantics by itself.

Serialization

Importable module-level @Flow.model functions produce generated classes with
stable module import paths, so JSON/config-style round trips can work across
processes when the defining module is importable.

Local, nested, and __main__ generated models are best-effort for
pickle/cloudpickle object transport, not stable config artifacts. Their analyzed
function contract is serialized so restore does not need to re-run type-hint
resolution in the receiving process.

Generated model and BoundModel pickle restore use portable validation data
instead of raw Pydantic state. This avoids fragile process-local generic classes
such as GenericResult[int] leaking into pickle/Ray payloads.

@Flow.context_transform bindings always store a serialized analyzed config.
They do not rely on import-path detection, because during decoration the module
global usually still points at the undecorated function.

Cache And Graph Identity

Public cache_key(...) remains structural by default.

Generated and bound models also support effective identity for model
evaluations. Effective identity describes the parts of an invocation that affect
the result, so unused ambient FlowContext fields do not split built-in cache
entries or graph nodes.

The built-in MemoryCacheEvaluator uses:

cache_key(context, effective=True)

Custom evaluators can use the same public API if they want generated-model-aware
keys:

from ccflow.evaluators import cache_key


key = cache_key(model_evaluation_context, effective=True)

The default remains structural:

cache_key(model_evaluation_context)

Ordinary handwritten CallableModel classes continue to use structural
identity. This is intentional: arbitrary CallableModel.__call__
implementations can inspect context in ways ccflow cannot infer safely.

Opaque evaluators also use structural identity, since they could access
arbitrary fields on the context that differ from the signature of a generated
model.

Unexpected errors while deriving effective identity propagate. The only
structural fallback is the explicit internal "effective key unavailable" path,
such as recursive effective identity.

Why Effective Identity Matters

The existing structural key can over-split cache entries when callers pass a
richer context than the model semantically uses. With structural context
identity, adding or changing an ambient field for one branch of a DAG can
invalidate cache reuse in another branch that does not use that field.

For ordinary handwritten models, ccflow cannot safely infer what Python code
uses. A normal __call__ implementation might inspect type(context), call
context.model_dump(), read subclass-only fields, or otherwise depend on the
full runtime context object.

@Flow.model improves this case because consumed contextual inputs are explicit
via FromContext[...], so generated models can safely ignore unused ambient
fields in effective cache and graph identity.

from datetime import date

from ccflow import Flow, FlowContext, FlowOptionsOverride, FromContext
from ccflow.evaluators import MemoryCacheEvaluator


calls = {"count": 0}


@Flow.model
def day_name(day: FromContext[date]) -> str:
    calls["count"] += 1
    return day.strftime("%A")


model = day_name()
cache = MemoryCacheEvaluator()

ctx1 = FlowContext(day=date(2024, 1, 1), request_id="a")
ctx2 = FlowContext(day=date(2024, 1, 1), request_id="b")

with FlowOptionsOverride(options={"evaluator": cache, "cacheable": True}):
    assert model(ctx1).value == "Monday"
    assert model(ctx2).value == "Monday"

assert calls["count"] == 1

Validation And Error Behavior

The generated model remains a Pydantic model, but ccflow owns runtime binding
and coercion semantics.

The generated model's stored Pydantic fields use SkipValidation[...]. This is
implemented in _generated_field_annotation(...), which is used when
create_model(...) builds the generated CallableModel subclass. The public
factory signature still shows the user-facing annotations (T,
FromContext[T], Lazy[T]); SkipValidation[...] is only for the internal
Pydantic fields stored on the generated model instance.

That prevents Pydantic field validation from forcing registry resolution,
dependency handling, lazy handling, or contextual-default handling before
ccflow's generated-model validator can apply the correct rules.

The generated Pydantic field schema keeps useful type information when Pydantic
can build a schema for it. If Pydantic cannot build a schema for known schema
construction reasons, only the Pydantic field schema falls back to Any;
runtime coercion still uses the real annotation.

Validation is literal-first for regular parameter values. Serialized-looking
dependency dictionaries using type_ or _target_ are only interpreted as
dependencies after normal literal validation fails where that distinction is
ambiguous.

Unexpected errors from type hint resolution, type adapter construction, runtime
validation internals, and effective identity derivation propagate instead of
being masked by broad fallback paths.

Dependency evaluation preserves the original exception type and adds dependency
path context when the Python runtime supports exception notes.

Compatibility

The PR is additive:

  • Existing CallableModel implementations continue to work.
  • Existing Flow.call behavior is preserved.
  • cache_key(...) remains structural unless effective=True is explicitly
    requested.
  • Plain CallableModel cache keys and graph keys remain structural.
  • FlowContext is an open runtime carrier for generated models.
  • Declared context_type=... can still be used to validate FromContext[...]
    fields against an existing nominal context.

Test Coverage

The test suite covers:

  • generated model execution and validation,
  • contextual defaults and runtime precedence,
  • direct dependency wiring and lazy dependencies,
  • regular container values remaining literals,
  • with_context(...) field and patch transforms,
  • ordered chained bindings and original-ambient transform reads,
  • declared context type validation,
  • model.flow.inspect(...) introspection,
  • dependency context projection for nested inspection,
  • registry and Hydra-style construction,
  • pickle/cloudpickle, Ray, and cross-process serialization,
  • stable import-path JSON round trips for importable generated models,
  • cache_key(..., effective=True) behavior,
  • dependency graph integration,
  • ordinary CallableModel compatibility,
  • error propagation for type hints, type adapters, validators, and effective
    identity.

NeejWeej added 15 commits May 4, 2026 04:40
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
…rs defining local classes

Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Test Results

    1 files  ±  0      1 suites  ±0   2m 55s ⏱️ +18s
1 181 tests +302  1 179 ✅ +302  2 💤 ±0  0 ❌ ±0 
1 187 runs  +302  1 185 ✅ +302  2 💤 ±0  0 ❌ ±0 

Results for commit cda9559. ± Comparison against base commit 0662c2a.

♻️ This comment has been updated with latest results.

@codecov

codecov Bot commented Jun 9, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 89.01028% with 171 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.05%. Comparing base (0662c2a) to head (cda9559).

Files with missing lines Patch % Lines
...amples/flow_model/flow_model_hydra_builder_demo.py 0.00% 57 Missing ⚠️
ccflow/examples/flow_model/flow_model_example.py 0.00% 49 Missing ⚠️
ccflow/_flow_model_binding.py 93.48% 18 Missing and 11 partials ⚠️
ccflow/tests/test_callable.py 92.35% 13 Missing ⚠️
ccflow/tests/test_flow_context.py 97.61% 7 Missing ⚠️
ccflow/evaluators/common.py 93.02% 4 Missing and 2 partials ⚠️
ccflow/tests/test_flow_model_strict_context.py 94.23% 3 Missing ⚠️
ccflow/context.py 93.10% 1 Missing and 1 partial ⚠️
ccflow/tests/flow_model_hydra_fixtures.py 91.30% 1 Missing and 1 partial ⚠️
ccflow/tests/test_flow_model_optional_context.py 96.61% 2 Missing ⚠️
... and 1 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #232      +/-   ##
==========================================
- Coverage   94.19%   93.05%   -1.14%     
==========================================
  Files         150      163      +13     
  Lines       12094    17987    +5893     
  Branches      665     1166     +501     
==========================================
+ Hits        11392    16738    +5346     
- Misses        570     1023     +453     
- Partials      132      226      +94     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

timkpaine added 6 commits June 9, 2026 18:38
`_build_compute_context` mixed argument validation, the explicit-context path,
plain-CallableModel defaults, and generated-model handling across one long
function with several exit branches. Extract the three paths into
`_compute_context_from_explicit`, `_compute_context_for_plain_model`, and
`_compute_context_for_generated_model`, leaving `_build_compute_context` as a
thin dispatcher.

Behavior-preserving refactor; no functional change. Verified against the full
flow_model, flow_context, hydra, and evaluator suites.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
In ccflow a bare string field value already resolves from the model registry,
so `ccflow.compose.model_alias`, a bare-string alias, and a root-relative
`/name` alias should all dereference to the same registered model instance.
`model_alias` is a Hydra convenience for the existing bare-string convention,
not a separate mechanism.

Adds a small Hydra config wiring one registered source three equivalent ways
and tests asserting all three resolve to the same instance (not a literal
string) and compute identically.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
…235)

`_parse_annotation` only peeled the top-level `Annotated`, so
`Optional[FromContext[int]]` (a Union on the outside) was silently
misclassified as a regular parameter and every call failed with a
misleading "cannot satisfy unbound regular parameter" error.

Detect `FromContext`/`Lazy` markers nested inside a top-level Optional and
define the two spellings precisely:

- `FromContext[Optional[int]]`: contextual, required-in-context, value may be None.
- `Optional[FromContext[int]]`: contextual, optional; absent -> bound to None
  (an implicit None default synthesized in `_analyze_flow_function`).

`FromContext[Optional[int]] = None` is therefore equivalent to
`Optional[FromContext[int]]`, and an explicit default still wins. Distinct
required-ness yields distinct config/cache identities via the existing
has_function_default/function_default identity terms.

Reject nested `Lazy` (`Optional[Lazy[int]]`) and non-Optional unions carrying
`FromContext` (e.g. `Union[FromContext[int], str]`) with clear messages.

Adds focused regression tests covering all call shapes, the consistency
equivalences, and the rejection cases.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
…e=) (#237)

Class-based CallableModel execution already accepts positional/string context
shorthand via ContextBase's ordered `zip(model_fields, v)` mapping (as used by
Hydra `+context=[...]`). Generated @Flow.model instances expose the open
FlowContext bag as their runtime context type, which has no declared fields to
zip against, so positional shorthand was silently dropped.

When a generated model declares a `context_type`, `compute()` now validates
non-mapping shorthand (list/tuple/str) through that declared type first, then
forwards the named values into the FlowContext bag. Mapping and named-kwarg
inputs keep their existing paths.

Scope: this covers the `compute()` entry point only. The direct-call form
(`model([...])`) is intentionally not supported, since `Flow.call` validates
against FlowContext before the generated body runs; supporting it would require
reverting the bag-of-types design.

Adds `_declared_context_type_for_model` and focused tests for list/tuple/string
shorthand, parity with named inputs, and that bag-only models are unaffected.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
The dependency-graph builder and cache-key path route every node through
`_effective_evaluation_key()`. For models that do not opt into effective
identity (everything except generated @Flow.model / BoundModel), the result
must stay byte-for-byte identical to the structural `cache_key()`.

Pin that equivalence so future changes to the effective path cannot silently
shift cache or graph identity for ordinary CallableModel graphs:

- effective cache_key == structural cache_key for simple, chain, and diamond graphs;
- the dependency graph (root_id, node keys, edges) built via the effective path
  equals an independently-computed structural graph;
- shared diamond leaves still dedupe to one node;
- `_build_dependency_graph` returns the structural root key.

Test-only; no library changes.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
)

By default a declared `context_type` may now be an "omnibus" superset: every
`FromContext[...]` parameter must exist on the context with a compatible type,
but the context may carry extra fields the model does not consume. This matches
the otherwise-permissive bag-of-types design and lets multiple models share one
broad context.

- `_validate_declared_context_type(..., strict=False)` keeps the missing-field
  and type-compatibility checks but only enforces the "every required context
  field is a FromContext param" bijection when `strict=True`.
- Runtime: `_validate_declared_context_values` validates the consumed fields
  individually when the declared context has unconsumed required fields (subset
  mode); otherwise it constructs the whole declared context so its cross-field
  validators still run. No config/serialization change needed.
- Thread `strict` through `flow_model` and document it on `Flow.model`.

Updates the existing extra-required-field test to reflect the new default
(allowed by default, rejected under strict=True) and adds focused tests for
subset execution, strict rejection/acceptance, and the shared missing/type
checks that apply in both modes.

Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants