[improve][pip] PIP-484: Expose Window interface for WindowFunction processing#25967
[improve][pip] PIP-484: Expose Window interface for WindowFunction processing#25967Dream95 wants to merge 2 commits into
Conversation
…talWindowFunction Signed-off-by: Dream95 <zhou_8621@163.com>
|
overall LGTM, just 2 points:
|
david-streamlio
left a comment
There was a problem hiding this comment.
Review of PIP-484. Overall this is a clear, well-scoped proposal — good background, an honest backward-compat analysis, a concrete example, and a diagram; it passes the "can a reader understand it without hours of code reading" health check. The inline comments below are (1) a few required template sections that are missing and (2) some public-API design points to nail down. I've left out the two points already raised on the PR (the getFunctionClassParent NPE dependency and list mutability).
|
|
||
| ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations | ||
|
|
||
| There is no wire-protocol change between Functions Workers. No special geo-replication considerations apply. |
There was a problem hiding this comment.
The template requires three sections that are currently missing — could you add them, even if brief?
- Security Considerations — this is a pure API addition with no new endpoints, so a sentence confirming "no new REST/protocol surface, no new auth or multi-tenancy implications" is enough.
- Monitoring / Metrics — please state explicitly "no new metrics; runtime behavior is unchanged."
- Alternatives — the most important one. Why a new interface rather than (a)
defaultmethods onWindowFunction, (b) an overloadedprocess(Window, ...), or (c) a config flag? Documenting why these were rejected will pre-empt the obvious review questions. It's also the right place to defend the nameIncrementalWindowFunction, since it exposes expired events too, not just increments.
There was a problem hiding this comment.
Option A seems better than the current approach of adding new interfaces. Let me think about whether there are any compatibility issues.
| } | ||
| ``` | ||
|
|
||
| The existing internal `Window.java` is replaced by a reference to the `api-java` interface (or removed entirely, with `WindowImpl` implementing the new public interface directly). |
There was a problem hiding this comment.
Promoting an internal type to public API is exactly the surface the PIP process exists to scrutinize, so this shouldn't be left as an either/or ("replaced by a reference … or removed entirely"). Please commit to one approach and spell out what happens to any existing references to the old org.apache.pulsar.functions.windowing.Window (even though it's an internal package today).
| * @param inputWindow the window view for this activation, providing access to | ||
| * all current events ({@link Window#get()}), | ||
| * newly added events ({@link Window#getNew()}), and | ||
| * expired events ({@link Window#getExpired()}). |
There was a problem hiding this comment.
In addition to the list-mutability question already raised on this PR, please document the lifetime of the Window reference: is it valid only during the process() call, or may a user retain it across triggers? Lifetime/ownership contracts matter once this interface is public.
| #### 3a. Add field | ||
|
|
||
| ```java | ||
| protected IncrementalWindowFunction<T, X> incrementalWindowFunction; |
There was a problem hiding this comment.
The public interface is declared IncrementalWindowFunction<X, T> (X=input, T=output), but this executor field is <T, X>. This matches the internal WindowFunction<T,X> convention, so it's defensible — but the doc shows both orderings without comment, which will trip readers. A one-line note clarifying the convention would help.
| if (userClassObject instanceof java.util.function.Function) { | ||
| // existing logic, unchanged | ||
| bareWindowFunction = ...; | ||
| } else if (userClassObject instanceof IncrementalWindowFunction) { |
There was a problem hiding this comment.
The dispatch order here is Function → IncrementalWindowFunction → WindowFunction. A user class implementing both IncrementalWindowFunction and WindowFunction (or both Function and IncrementalWindowFunction) resolves by this precedence. Since that becomes an observable public-API contract, please state the ordering explicitly and confirm it's intentional.
| |------|--------| | ||
| | `FunctionConfigUtils.doJavaChecks()` | Add `IncrementalWindowFunction` to the allowed user-class interfaces. | | ||
| | `FunctionCommon.getFunctionClassParent()` | When `windowConfig` is set, resolve `IncrementalWindowFunction` before `WindowFunction` so input/output type inference for SerDe and schema checks stays correct. | | ||
|
|
There was a problem hiding this comment.
Not required by the template, but reviewers usually ask: a sentence on intended test coverage (executor dispatch for each interface type, and deployment-validation acceptance of the new interface) would strengthen the proposal.
| | `List<T> getNew()` | Events added since the last trigger | | ||
| | `List<T> getExpired()` | Events removed since the last trigger | | ||
| | `Long getStartTimestamp()` | Window start time (non-null for time-based windows, otherwise `null`) | | ||
| | `Long getEndTimestamp()` | Window end time (watermark in event-time mode, system time in processing-time mode) | |
There was a problem hiding this comment.
getStartTimestamp() documents its null behavior, but getEndTimestamp()'s description implies it is never null. Please confirm and capture this in the Javadoc, since both methods are now public.
|
A few PIP-process items (separate from the proposal content):
|
…tion default overload Signed-off-by: Dream95 <zhou_8621@163.com>
|
@dao-jun @david-streamlio Thanks for the review. |
david-streamlio
left a comment
There was a problem hiding this comment.
Thanks for the substantial revision — the pivot from a new IncrementalWindowFunction interface to a default process(Window, WindowContext) overload on the existing WindowFunction is a cleaner direction, and it resolves most of the prior round's concerns as a side effect (no new submit-time validation needed, no multi-interface dispatch precedence to document, type-param ordering now consistent). The template is now complete (Security / Monitoring / Alternatives all present) and the backward-compat + classloader analysis is thorough. I verified the executor/interface claims against current master and they hold.
Posting as COMMENTED — the remaining items are example quality plus one design tradeoff I think the mailing-list vote should explicitly ratify, not correctness defects in the proposal. Details inline.
| /** | ||
| * Maintains the sum of integer values in the current sliding window incrementally. | ||
| */ | ||
| public class SlidingWindowSumFunction implements WindowFunction<Integer, Integer> { |
There was a problem hiding this comment.
This example surfaces a footgun worth calling out in the prose. The abstract process(Collection, …) is what every user must implement, but once a class also overrides the process(Window, …) default, the executor only ever calls the Window overload — so the Collection override becomes dead code at runtime. Here the two overloads even return divergent results (stream().sum() vs. the counter-based running sum), which reads as if both paths are live.
Suggest either (a) having the Collection method share logic with the Window method (e.g. the Window override calls process(window.get(), context) for the base case), or (b) dropping the second override from the example — and adding one sentence: "once you override process(Window, …), the runtime no longer invokes any process(Collection, …) override."
| if (netDelta != 0) { | ||
| context.incrCounter(RUNNING_SUM_KEY, netDelta); | ||
| } | ||
| return (int) context.getCounter(RUNNING_SUM_KEY); |
There was a problem hiding this comment.
Minor / optional: using durable counter state (incrCounter/getCounter) to hold a window-scoped aggregate is fragile as a reference example — the counter and the window have different lifetimes, so an instance restart drifts the result (counter state persists while the window is rebuilt, or vice-versa). For teaching incremental aggregation, a local accumulator field (with the per-instance caveat noted) or returning the net delta against a maintained value would be clearer and wouldn't imply that durable state is the intended pattern.
| } | ||
| ``` | ||
|
|
||
| This keeps `WindowFunction` a valid `@FunctionalInterface`. Existing lambdas continue to implement the collection-based abstract method and cannot override the default overload, so lambda behavior remains unchanged. Users that need `getNew()` / `getExpired()` implement `WindowFunction` as a class and override the new default overload. |
There was a problem hiding this comment.
This is the central tradeoff of the chosen design and I'd make it more prominent (it's currently only implied here and in Alternative 2): lambda implementations cannot access the incremental views at all, since they bind to the abstract method and can't override a default. Given that lambdas are a very common window-function authoring style, this is exactly the point I'd want the mailing-list discussion/vote to ratify explicitly — Alternative 2 (WindowContext.getWindow()) would cover lambdas too, at the cost of weaker typing. No change needed to the design; just recommend stating the lambda limitation as an accepted, voted-on tradeoff rather than a footnote.
| /** | ||
| * The window start timestamp for this activation. | ||
| * <p> | ||
| * This is computed from {@link #getEndTimestamp()} and the configured |
There was a problem hiding this comment.
Please confirm this wording matches how the runtime actually populates these. WindowImpl just stores constructor-injected startTimestamp/endTimestamp; the "start = end − window length" computation lives in the caller (WindowManager). Since these are now public-API contracts, worth double-checking the sliding-window case in particular (window length vs. sliding interval) so the Javadoc doesn't over-promise.
|
@Dream95 The proposal reads well now and the design discussion has mostly converged here — at this point the main thing gating progress is the mailing-list Could you kick that off? Roughly:
That moves the design conversation to where the binding +1s happen. The review comments I left are all polish and can be folded in during the discussion — they're not blockers. Thanks for the iterations on this! |
Motivation
Pulsar Window Functions currently invoke
WindowFunction.process(Collection<Record<X>>, ...)with all messages in the window on every trigger. Internally,WindowManageralready classifies events into full, newly added, and expired lists on each activation, butWindowFunctionExecutordropsgetNew()andgetExpired()before calling the user function.This makes incremental computation inefficient for sliding windows and forces users to manually diff full collections.
Modifications
This PR adds PIP-484, which proposes:
Window<T>interface to the public API (get(),getNew(),getExpired(), timestamps).org.apache.pulsar.functions.windowing.Windowtype; instance code importsorg.apache.pulsar.functions.api.Windowdirectly.process(Window<Record<X>>, WindowContext)overload on the existingWindowFunction<X, T>interface. The default delegates toprocess(window.get(), context), so lambdas and existing class implementations remain unchanged.WindowFunctionExecutorto pass the fullWindowobject toWindowFunction.process(window, context)instead of onlyinputWindow.get().Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes