diff --git a/src/main/java/org/apache/nifi/components/Backlog.java b/src/main/java/org/apache/nifi/components/Backlog.java new file mode 100644 index 0000000..29402b7 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/Backlog.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +/** + *
+ * An immutable description of how much work remains for a Connector or Processor + * to consume from its source system. A {@code Backlog} may report any combination of + * FlowFile count, byte count, and record count, and may also indicate the most recent + * time at which the component observed itself as fully caught up with the source. + *
+ * + *+ * All four dimensions are optional and independent. A component reports only those + * dimensions that it can determine. For example, a Connector pulling from an object + * store may know both the number of objects remaining and the total bytes those + * objects represent, while a Connector pulling from a streaming system may know only + * the number of records remaining. + *
+ * + *+ * The numeric dimensions are interpreted in light of the {@link Precision} attribute, + * which applies to the entire {@code Backlog}. {@link Precision#EXACT} (the default) + * means the values are exact counts; {@link Precision#AT_LEAST} means the values are + * lower bounds and the real counts may be higher. + *
+ * + *+ * The {@link #getLastCaughtUp() lastCaughtUp} timestamp is unaffected by precision and + * is always treated as exact. It represents the most recent moment the component + * determined that there was zero data remaining to pull from the source. It is computed + * by the component using whatever knowledge it has — typically during its normal polling + * of the source — and is not merely the time a backlog query last returned zero. + *
+ * + *+ * Backlogs are composed via {@link #plus(Backlog)}; see that method for the per-field + * combination rules. + *
+ * + *+ * This API is experimental and may change without notice between releases. + *
+ */ +public final class Backlog { + + private final OptionalLong flowFileCount; + private final OptionalLong byteCount; + private final OptionalLong recordCount; + private final Optional+ * Combines this Backlog with another to produce a new {@code Backlog} describing + * the union of the two. Useful for composing a flow-wide backlog from multiple sources + * such as multiple Processor reports and queue snapshots. + *
+ * + *+ * Combination rules: + *
+ *+ * Returns the canonical "fully caught up" Backlog: zero FlowFiles, zero bytes, zero records, + * and a {@code lastCaughtUp} timestamp of {@link Instant#now()}. + *
+ * + *+ * This state is the unambiguous "I am fully synchronized with the source as of now" signal. + *
+ * + * @return a new "caught up" Backlog + */ + public static Backlog caughtUp() { + return new Builder() + .flowFiles(0L) + .bytes(0L) + .records(0L) + .lastCaughtUp(Instant.now()) + .build(); + } + + /** + * @return a new {@link Builder} for constructing a Backlog with multiple dimensions and/or {@link Precision#AT_LEAST} + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + + if (!(object instanceof Backlog)) { + return false; + } + + final Backlog other = (Backlog) object; + return Objects.equals(flowFileCount, other.flowFileCount) + && Objects.equals(byteCount, other.byteCount) + && Objects.equals(recordCount, other.recordCount) + && Objects.equals(lastCaughtUp, other.lastCaughtUp) + && precision == other.precision; + } + + @Override + public int hashCode() { + return Objects.hash(flowFileCount, byteCount, recordCount, lastCaughtUp, precision); + } + + @Override + public String toString() { + return "Backlog[" + + "flowFileCount=" + flowFileCount + + ", byteCount=" + byteCount + + ", recordCount=" + recordCount + + ", lastCaughtUp=" + lastCaughtUp + + ", precision=" + precision + + "]"; + } + + /** + * Builder for constructing a {@link Backlog} that populates more than one dimension or uses a + * non-default {@link Precision}. For single-dimension Backlogs, prefer the static factory methods + * such as {@link Backlog#flowFiles(long)}. + */ + public static final class Builder { + + private OptionalLong flowFileCount = OptionalLong.empty(); + private OptionalLong byteCount = OptionalLong.empty(); + private OptionalLong recordCount = OptionalLong.empty(); + private Optional+ * The precision of the numeric dimensions of a {@link Backlog}. Applies to the entire Backlog + * rather than to individual dimensions. + *
+ */ + public enum Precision { + + /** + * Numeric dimensions reflect exact counts. + */ + EXACT, + + /** + * Numeric dimensions are lower bounds; the real counts may be higher. + */ + AT_LEAST + } +} diff --git a/src/main/java/org/apache/nifi/components/BacklogReportingException.java b/src/main/java/org/apache/nifi/components/BacklogReportingException.java new file mode 100644 index 0000000..942c62c --- /dev/null +++ b/src/main/java/org/apache/nifi/components/BacklogReportingException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components; + +/** + *+ * Thrown by a Connector or Processor that supports backlog reporting but cannot + * determine its current backlog due to a transient or permanent failure. Typical causes + * include I/O errors reaching the source system, authorization failures (the component is + * allowed to consume data but is not authorized to call the offset or listing API), and + * state errors that prevent computation. + *
+ * + *+ * This exception is distinct from a Connector or Processor not supporting backlog reporting + * at all. Components that do not support backlog reporting simply do not implement + * {@link org.apache.nifi.processor.BacklogReportingProcessor} or + * {@link org.apache.nifi.components.connector.BacklogReportingConnector}. Components that do + * support backlog reporting may return {@link java.util.Optional#empty()} when they cannot + * determine a value right now, or throw this exception when an attempt to determine the + * backlog fails. Callers should surface the cause rather than silently treat the failure as + * "not supported". + *
+ */ +public class BacklogReportingException extends Exception { + + public BacklogReportingException(final String message) { + super(message); + } + + public BacklogReportingException(final String message, final Throwable cause) { + super(message, cause); + } + + public BacklogReportingException(final Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java b/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java new file mode 100644 index 0000000..b71f8b6 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.Backlog; +import org.apache.nifi.components.BacklogReportingException; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessorFacade; + +import java.util.Optional; + +/** + *+ * Optional capability interface implemented by {@link Connector}s that can report a + * {@link Backlog} describing how much data remains on the source system(s) the Connector + * pulls from but has not yet brought into NiFi. Implementing this interface is the opt-in + * signal: Connectors that do not implement it are simply not asked for backlog, and the + * framework's REST endpoints return HTTP {@code 409 Conflict} rather than treating "not + * supported" the same as "supported but currently unknown". + *
+ * + *+ * This mirrors the {@code BacklogReportingProcessor} capability interface on the Processor + * side: the {@code Connector} base interface stays focused on lifecycle and configuration, + * and backlog reporting is layered on as a separate opt-in. + *
+ * + *+ * Implementations that do report backlog typically derive the value from a single source — + * most often a single Processor's {@code BacklogReportingProcessor} report exposed via + * {@link ProcessorFacade#getBacklog()}. Some Connectors may compose a flow-wide picture from + * multiple sources: for example, summing the {@link Backlog} from multiple Processors, or, for + * a List/Fetch pattern, also inspecting queues and FlowFile attributes to account for data that + * has been "Listed" but not yet "Fetched". + *
+ * + *Optional.of(Backlog.caughtUp()) means the Connector is fully synchronized
+ * with its source as of now.
+ * Optional.of(...) with non-zero numeric dimensions means the Connector knows
+ * that this much data remains on the source. See {@link Backlog.Precision} for whether the
+ * numbers are exact or lower bounds.
+ * + * Hard failures — source unreachable, authorization denied, state errors that prevent + * computation — are reported by throwing {@link BacklogReportingException}, not by returning + * {@link Optional#empty()}. This keeps failures distinguishable from "no value available." + *
+ * + *+ * Implementation Note: This API is currently experimental, as it is under very active + * development. As such, it is subject to change without notice between releases. + *
+ */ +public interface BacklogReportingConnector { + + /** + *+ * Reports how much data remains on the source system(s) that this Connector pulls from but + * has not yet brought into NiFi. + *
+ * + *+ * This method is invoked against the active {@link FlowContext}, because backlog is + * a runtime property of the running flow. The working flow context is not used for backlog + * reporting. Expected call frequency is low — typically on the order of once per minute, + * sometimes once every few minutes. Implementations may cache results internally if computing + * backlog puts non-trivial load on the source, but caching is not required. + *
+ * + * @param activeFlowContext the active flow context against which to compute backlog + * @return the Connector's reported {@link Backlog}, or {@link Optional#empty()} if the Connector + * cannot determine a value right now (for example, it has never polled the source) + * @throws BacklogReportingException if the Connector attempted to determine its backlog and failed + */ + Optional+ * Facade exposing per-Connection operations to a Connector implementation. The framework constructs + * and supplies these facades; Connector extensions do not implement this interface themselves. + *
+ */ public interface ConnectionFacade { VersionedConnection getDefinition(); @@ -49,4 +55,12 @@ public interface ConnectionFacade { */ DropFlowFileSummary dropFlowFiles(Predicate+ * Facade exposing per-Processor operations to a Connector implementation. The framework constructs + * and supplies these facades; Connector extensions do not implement this interface themselves. + *
+ */ public interface ProcessorFacade { VersionedProcessor getDefinition(); @@ -75,4 +85,32 @@ public interface ProcessorFacade { * @throws InvocationFailedException if unable to invoke the method */+ * Returns the underlying Processor's reported {@link Backlog}. This is the bridge a + * Connector uses to ask a Processor in its flow how much data remains on the source. + *
+ * + *+ * The return value and exception semantics mirror those of + * {@link BacklogReportingProcessor#getBacklog(org.apache.nifi.processor.ProcessContext)}. If + * the underlying Processor does not implement {@link BacklogReportingProcessor}, this method + * returns {@link Optional#empty()}. + *
+ * + * @return the Processor's reported {@link Backlog}, or {@link Optional#empty()} if the Processor does not + * implement {@link BacklogReportingProcessor} or has nothing to report + * @throws BacklogReportingException if the Processor attempted to determine its backlog and failed + */ + Optional+ * A point-in-time view of a connection's queue, returned by + * {@link ConnectionFacade#getQueueSnapshot()}. Bundles the connection's total {@link QueueSize} + * together with the {@link FlowFile}s currently held in this node's active in-memory queue, + * meaning the FlowFiles resident in memory on this node and available in poll order. + *
+ * + *+ * The active list and the {@link QueueSize} are captured atomically, so they describe the same + * point in time. The active list is not always the entire queue. FlowFiles that have been + * swapped out are excluded from {@link #getActiveFlowFiles()}. On load-balanced connections, + * FlowFiles in flight between nodes are excluded as well. Those FlowFiles are still counted in + * {@link #getQueueSize()}. Use {@link #isActiveListExhaustive()} to determine whether the + * active list contains every FlowFile counted in the queue size. + *
+ * + *+ * {@link #getActiveFlowFiles()} returns the full active list without a caller-supplied limit, but + * the list itself is bounded by the framework's maximum active in-memory queue size. The + * snapshot does not clone the {@link FlowFile} instances; it references the existing active + * FlowFiles. + *
+ */ +public interface QueueSnapshot { + + /** + * @return the total {@link QueueSize} of the connection at the time the snapshot was taken + */ + QueueSize getQueueSize(); + + /** + * @return the active in-memory FlowFiles in poll order, never null; excludes FlowFiles that + * have been swapped out and, for load-balanced connections, FlowFiles in flight between + * nodes + */ + List+ * Optional capability interface implemented by Processors that can report a {@link Backlog} + * describing how much data remains on the source system that the Processor has not yet pulled + * into NiFi. Implementing this interface is the opt-in signal: Processors that do not implement + * it are simply not asked for backlog. + *
+ * + *+ * The reported {@link Backlog} reflects only the data the Processor knows about from the source. + * A Connector that composes a flow-wide backlog generally also accounts for FlowFiles already + * enqueued in NiFi between processors; that composition is the Connector's responsibility, not + * the Processor's. + *
+ * + *+ * The expected call frequency for {@link #getBacklog(ProcessContext)} is low — on the order of + * once per minute, sometimes once every few minutes. Implementations may cache results internally + * if computing backlog puts non-trivial load on the source, but caching is not required. + *
+ * + *Optional.of(Backlog.caughtUp()) means the Processor is fully
+ * synchronized with its source as of now.
+ * Optional.of(...) with non-zero numeric dimensions means the Processor knows that
+ * this much data remains on the source. See {@link Backlog.Precision} for whether the
+ * numbers are exact or lower bounds.
+ * + * Hard failures — source unreachable, authorization denied, state errors that prevent + * computation — are reported by throwing {@link BacklogReportingException}, not by returning + * {@link Optional#empty()}. This keeps failures distinguishable from "no value available." + *
+ */ +public interface BacklogReportingProcessor { + + /** + * Returns the Processor's current view of how much data remains on the source. + * + *+ * The framework always supplies a non-null {@link ProcessContext}. Implementations may use + * the supplied context to read property values and to build whatever transient clients they + * need to query the source. Implementations are responsible for closing any transient + * clients they open during the call. + *
+ * + *+ * The framework does not serialize calls to this method. Multiple {@code getBacklog} + * invocations may run concurrently against the same Processor instance, so implementations + * must be safe for concurrent use. Implementations that open per-call transient clients are + * naturally isolated. Implementations that cache shared clients across calls must guard + * them appropriately or rely on thread-safe SDK clients. + *
+ * + * @param context a non-null {@link ProcessContext} usable to look up property values and to + * construct transient clients to the source + * @return the Processor's reported {@link Backlog}, or {@link Optional#empty()} if the Processor + * cannot determine a value right now (e.g. it has never polled the source) + * @throws BacklogReportingException if the Processor attempted to determine its backlog and + * failed due to a transient or permanent error such as I/O failure or authorization denial + */ + Optional