From 77946ac8239730b9d15e061cbe4a15ae94509cd6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 18 May 2026 16:43:41 -0400 Subject: [PATCH 1/2] NIFI-16045: Initial API for Processor / Connector Backlog --- .../org/apache/nifi/components/Backlog.java | 417 ++++++++++++++++++ .../components/BacklogReportingException.java | 53 +++ .../connector/BacklogReportingConnector.java | 102 +++++ .../nifi/components/connector/Connector.java | 1 + .../components/ConnectionFacade.java | 14 + .../connector/components/ProcessorFacade.java | 38 ++ .../connector/components/QueueSnapshot.java | 68 +++ .../processor/BacklogReportingProcessor.java | 97 ++++ .../apache/nifi/components/TestBacklog.java | 115 +++++ 9 files changed, 905 insertions(+) create mode 100644 src/main/java/org/apache/nifi/components/Backlog.java create mode 100644 src/main/java/org/apache/nifi/components/BacklogReportingException.java create mode 100644 src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java create mode 100644 src/main/java/org/apache/nifi/components/connector/components/QueueSnapshot.java create mode 100644 src/main/java/org/apache/nifi/processor/BacklogReportingProcessor.java create mode 100644 src/test/java/org/apache/nifi/components/TestBacklog.java diff --git a/src/main/java/org/apache/nifi/components/Backlog.java b/src/main/java/org/apache/nifi/components/Backlog.java new file mode 100644 index 0000000..1ab31c7 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/Backlog.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +/** + *

+ * An immutable description of how much work remains for a Connector or Processor + * to consume from its source system. A {@code Backlog} may report any combination of + * FlowFile count, byte count, and record count, and may also indicate the most recent + * time at which the component observed itself as fully caught up with the source. + *

+ * + *

+ * All four dimensions are optional and independent. A component reports only those + * dimensions that it can determine. For example, a Connector pulling from an object + * store may know both the number of objects remaining and the total bytes those + * objects represent, while a Connector pulling from a streaming system may know only + * the number of records remaining. + *

+ * + *

+ * The numeric dimensions are interpreted in light of the {@link Precision} attribute, + * which applies to the entire {@code Backlog}. {@link Precision#EXACT} (the default) + * means the values are exact counts; {@link Precision#AT_LEAST} means the values are + * lower bounds and the real counts may be higher. + *

+ * + *

+ * The {@link #getLastCaughtUp() lastCaughtUp} timestamp is unaffected by precision and + * is always treated as exact. It represents the most recent moment the component + * determined that there was zero data remaining to pull from the source. It is computed + * by the component using whatever knowledge it has — typically during its normal polling + * of the source — and is not merely the time a backlog query last returned zero. + *

+ * + *

+ * Backlogs are composed via {@link #plus(Backlog)}; see that method for the per-field + * combination rules. + *

+ * + *

+ * Implementation Note: This API is currently experimental, as it is under very active + * development. As such, it is subject to change without notice between releases. + *

+ */ +public final class Backlog { + + private final OptionalLong flowFileCount; + private final OptionalLong byteCount; + private final OptionalLong recordCount; + private final Optional lastCaughtUp; + private final Precision precision; + + private Backlog(final Builder builder) { + this.flowFileCount = builder.flowFileCount; + this.byteCount = builder.byteCount; + this.recordCount = builder.recordCount; + this.lastCaughtUp = builder.lastCaughtUp; + this.precision = builder.precision; + } + + /** + * @return the number of FlowFiles remaining on the source, if known + */ + public OptionalLong getFlowFileCount() { + return flowFileCount; + } + + /** + * @return the total number of bytes remaining on the source, if known + */ + public OptionalLong getByteCount() { + return byteCount; + } + + /** + * @return the number of records remaining on the source, if known + */ + public OptionalLong getRecordCount() { + return recordCount; + } + + /** + * @return the most recent moment the component observed itself as fully caught up with the source, if known + */ + public Optional getLastCaughtUp() { + return lastCaughtUp; + } + + /** + * @return the {@link Precision} of the numeric dimensions of this Backlog. Never null. + */ + public Precision getPrecision() { + return precision; + } + + /** + *

+ * Combines this Backlog with another to produce a new {@code Backlog} describing + * the union of the two. Useful for composing a flow-wide backlog from multiple sources + * such as multiple Processor reports and queue snapshots. + *

+ * + *

+ * Combination rules: + *

+ *
    + *
  • + * Numeric dimensions ({@code flowFileCount}, {@code byteCount}, {@code recordCount}): + * if both sides have a value, the result is their sum. If only one side has a value, + * it is carried through but the combined {@link Precision} is downgraded to + * {@link Precision#AT_LEAST} (see the precision rule below), because the side that did + * not report the dimension is unknown rather than zero. If neither side has a value, + * the field stays empty. + *
  • + *
  • + * {@link #getLastCaughtUp() lastCaughtUp}: not summable. If both sides have a value, + * the result is the earlier of the two timestamps — the more conservative claim about how + * recently the system was fully caught up. If only one side has it, that one is carried + * through. + *
  • + *
  • + * {@link #getPrecision() precision}: the result is {@link Precision#EXACT} only when both + * sides are {@code EXACT} and both sides report the same set of populated numeric + * dimensions. Otherwise the result is {@link Precision#AT_LEAST}. Any uncertainty in either + * operand — including a missing dimension on one side that the other side reported — + * taints the result, because "unknown" must not be treated as zero. + *
  • + *
+ * + * @param other the other Backlog to combine with this one + * @return a new Backlog representing the combination + * @throws ArithmeticException if a numeric sum would overflow {@code long} + */ + public Backlog plus(final Backlog other) { + Objects.requireNonNull(other, "other Backlog must not be null"); + + final OptionalLong sumFlowFiles = sumOptional(flowFileCount, other.flowFileCount); + final OptionalLong sumBytes = sumOptional(byteCount, other.byteCount); + final OptionalLong sumRecords = sumOptional(recordCount, other.recordCount); + final Optional earliestCaughtUp = earlierOf(lastCaughtUp, other.lastCaughtUp); + + // A dimension that one side reports and the other does not is "unknown" on the omitting side, + // not zero. Carrying the known value forward and still reporting EXACT would let the result + // claim completeness it cannot back up, so any such asymmetry forces AT_LEAST. + final boolean dimensionsAsymmetric = flowFileCount.isPresent() != other.flowFileCount.isPresent() + || byteCount.isPresent() != other.byteCount.isPresent() + || recordCount.isPresent() != other.recordCount.isPresent(); + final boolean bothExact = precision == Precision.EXACT && other.precision == Precision.EXACT; + final Precision combinedPrecision = (bothExact && !dimensionsAsymmetric) ? Precision.EXACT : Precision.AT_LEAST; + + final Builder builder = new Builder().precision(combinedPrecision); + if (sumFlowFiles.isPresent()) { + builder.flowFiles(sumFlowFiles.getAsLong()); + } + if (sumBytes.isPresent()) { + builder.bytes(sumBytes.getAsLong()); + } + if (sumRecords.isPresent()) { + builder.records(sumRecords.getAsLong()); + } + earliestCaughtUp.ifPresent(builder::lastCaughtUp); + return builder.build(); + } + + private static OptionalLong sumOptional(final OptionalLong left, final OptionalLong right) { + if (left.isPresent() && right.isPresent()) { + return OptionalLong.of(Math.addExact(left.getAsLong(), right.getAsLong())); + } + if (left.isPresent()) { + return left; + } + if (right.isPresent()) { + return right; + } + return OptionalLong.empty(); + } + + private static Optional earlierOf(final Optional left, final Optional right) { + if (left.isPresent() && right.isPresent()) { + return left.get().isBefore(right.get()) ? left : right; + } + if (left.isPresent()) { + return left; + } + return right; + } + + /** + * Creates a Backlog whose only populated dimension is the FlowFile count, with {@link Precision#EXACT}. + * + * @param count the number of FlowFiles remaining on the source; must be non-negative + * @return a new Backlog + */ + public static Backlog flowFiles(final long count) { + return new Builder().flowFiles(count).build(); + } + + /** + * Creates a Backlog whose only populated dimension is the byte count, with {@link Precision#EXACT}. + * + * @param bytes the number of bytes remaining on the source; must be non-negative + * @return a new Backlog + */ + public static Backlog bytes(final long bytes) { + return new Builder().bytes(bytes).build(); + } + + /** + * Creates a Backlog whose only populated dimension is the record count, with {@link Precision#EXACT}. + * + * @param count the number of records remaining on the source; must be non-negative + * @return a new Backlog + */ + public static Backlog records(final long count) { + return new Builder().records(count).build(); + } + + /** + * Creates a Backlog whose only populated dimension is the {@code lastCaughtUp} timestamp. + * Useful for combining with a count-only Backlog via {@link #plus(Backlog)}. + * + * @param instant the moment at which the component was last observed as fully caught up + * @return a new Backlog + */ + public static Backlog lastCaughtUp(final Instant instant) { + return new Builder().lastCaughtUp(instant).build(); + } + + /** + *

+ * Returns the canonical "fully caught up" Backlog: zero FlowFiles, zero bytes, zero records, + * and a {@code lastCaughtUp} timestamp of {@link Instant#now()}. + *

+ * + *

+ * This state is the unambiguous "I am fully synchronized with the source as of now" signal. + *

+ * + * @return a new "caught up" Backlog + */ + public static Backlog caughtUp() { + return new Builder() + .flowFiles(0L) + .bytes(0L) + .records(0L) + .lastCaughtUp(Instant.now()) + .build(); + } + + /** + * @return a new {@link Builder} for constructing a Backlog with multiple dimensions and/or {@link Precision#AT_LEAST} + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public boolean equals(final Object object) { + if (this == object) { + return true; + } + if (!(object instanceof Backlog)) { + return false; + } + final Backlog other = (Backlog) object; + return Objects.equals(flowFileCount, other.flowFileCount) + && Objects.equals(byteCount, other.byteCount) + && Objects.equals(recordCount, other.recordCount) + && Objects.equals(lastCaughtUp, other.lastCaughtUp) + && precision == other.precision; + } + + @Override + public int hashCode() { + return Objects.hash(flowFileCount, byteCount, recordCount, lastCaughtUp, precision); + } + + @Override + public String toString() { + return "Backlog[" + + "flowFileCount=" + flowFileCount + + ", byteCount=" + byteCount + + ", recordCount=" + recordCount + + ", lastCaughtUp=" + lastCaughtUp + + ", precision=" + precision + + "]"; + } + + /** + * Builder for constructing a {@link Backlog} that populates more than one dimension or uses a + * non-default {@link Precision}. For single-dimension Backlogs, prefer the static factory methods + * such as {@link Backlog#flowFiles(long)}. + */ + public static final class Builder { + + private OptionalLong flowFileCount = OptionalLong.empty(); + private OptionalLong byteCount = OptionalLong.empty(); + private OptionalLong recordCount = OptionalLong.empty(); + private Optional lastCaughtUp = Optional.empty(); + private Precision precision = Precision.EXACT; + + /** + * Sets the FlowFile count. Overwrites any previously set value. + * + * @param count the number of FlowFiles remaining on the source; must be non-negative + * @return this Builder + * @throws IllegalArgumentException if {@code count} is negative + */ + public Builder flowFiles(final long count) { + requireNonNegative("flowFiles", count); + this.flowFileCount = OptionalLong.of(count); + return this; + } + + /** + * Sets the byte count. Overwrites any previously set value. + * + * @param bytes the number of bytes remaining on the source; must be non-negative + * @return this Builder + * @throws IllegalArgumentException if {@code bytes} is negative + */ + public Builder bytes(final long bytes) { + requireNonNegative("bytes", bytes); + this.byteCount = OptionalLong.of(bytes); + return this; + } + + /** + * Sets the record count. Overwrites any previously set value. + * + * @param count the number of records remaining on the source; must be non-negative + * @return this Builder + * @throws IllegalArgumentException if {@code count} is negative + */ + public Builder records(final long count) { + requireNonNegative("records", count); + this.recordCount = OptionalLong.of(count); + return this; + } + + /** + * Sets the timestamp at which the component was last observed as fully caught up. Overwrites + * any previously set value. + * + * @param instant the moment at which the component was last fully caught up; may be null to clear + * @return this Builder + */ + public Builder lastCaughtUp(final Instant instant) { + this.lastCaughtUp = Optional.ofNullable(instant); + return this; + } + + /** + * Sets the {@link Precision} for this Backlog. Defaults to {@link Precision#EXACT} if not set. + * + * @param precision the precision; must not be null + * @return this Builder + */ + public Builder precision(final Precision precision) { + this.precision = Objects.requireNonNull(precision, "precision must not be null"); + return this; + } + + /** + * @return a new immutable {@link Backlog} reflecting the values set on this Builder + */ + public Backlog build() { + return new Backlog(this); + } + + private static void requireNonNegative(final String fieldName, final long value) { + if (value < 0L) { + throw new IllegalArgumentException(fieldName + " must be non-negative but was " + value); + } + } + } + + /** + *

+ * The precision of the numeric dimensions of a {@link Backlog}. Applies to the entire Backlog + * rather than to individual dimensions. + *

+ */ + public enum Precision { + + /** + * Numeric dimensions reflect exact counts. + */ + EXACT, + + /** + * Numeric dimensions are lower bounds; the real counts may be higher. + */ + AT_LEAST + } +} diff --git a/src/main/java/org/apache/nifi/components/BacklogReportingException.java b/src/main/java/org/apache/nifi/components/BacklogReportingException.java new file mode 100644 index 0000000..942c62c --- /dev/null +++ b/src/main/java/org/apache/nifi/components/BacklogReportingException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components; + +/** + *

+ * Thrown by a Connector or Processor that supports backlog reporting but cannot + * determine its current backlog due to a transient or permanent failure. Typical causes + * include I/O errors reaching the source system, authorization failures (the component is + * allowed to consume data but is not authorized to call the offset or listing API), and + * state errors that prevent computation. + *

+ * + *

+ * This exception is distinct from a Connector or Processor not supporting backlog reporting + * at all. Components that do not support backlog reporting simply do not implement + * {@link org.apache.nifi.processor.BacklogReportingProcessor} or + * {@link org.apache.nifi.components.connector.BacklogReportingConnector}. Components that do + * support backlog reporting may return {@link java.util.Optional#empty()} when they cannot + * determine a value right now, or throw this exception when an attempt to determine the + * backlog fails. Callers should surface the cause rather than silently treat the failure as + * "not supported". + *

+ */ +public class BacklogReportingException extends Exception { + + public BacklogReportingException(final String message) { + super(message); + } + + public BacklogReportingException(final String message, final Throwable cause) { + super(message, cause); + } + + public BacklogReportingException(final Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java b/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java new file mode 100644 index 0000000..b71f8b6 --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/BacklogReportingConnector.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.Backlog; +import org.apache.nifi.components.BacklogReportingException; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessorFacade; + +import java.util.Optional; + +/** + *

+ * Optional capability interface implemented by {@link Connector}s that can report a + * {@link Backlog} describing how much data remains on the source system(s) the Connector + * pulls from but has not yet brought into NiFi. Implementing this interface is the opt-in + * signal: Connectors that do not implement it are simply not asked for backlog, and the + * framework's REST endpoints return HTTP {@code 409 Conflict} rather than treating "not + * supported" the same as "supported but currently unknown". + *

+ * + *

+ * This mirrors the {@code BacklogReportingProcessor} capability interface on the Processor + * side: the {@code Connector} base interface stays focused on lifecycle and configuration, + * and backlog reporting is layered on as a separate opt-in. + *

+ * + *

+ * Implementations that do report backlog typically derive the value from a single source — + * most often a single Processor's {@code BacklogReportingProcessor} report exposed via + * {@link ProcessorFacade#getBacklog()}. Some Connectors may compose a flow-wide picture from + * multiple sources: for example, summing the {@link Backlog} from multiple Processors, or, for + * a List/Fetch pattern, also inspecting queues and FlowFile attributes to account for data that + * has been "Listed" but not yet "Fetched". + *

+ * + *

Return value semantics

+ *
    + *
  • + * {@link Optional#empty()} means the Connector cannot report a {@link Backlog} right now, + * even though it implements this interface. This is distinct from a hard failure. + *
  • + *
  • + * Optional.of(Backlog.caughtUp()) means the Connector is fully synchronized + * with its source as of now. + *
  • + *
  • + * Optional.of(...) with non-zero numeric dimensions means the Connector knows + * that this much data remains on the source. See {@link Backlog.Precision} for whether the + * numbers are exact or lower bounds. + *
  • + *
+ * + *

+ * Hard failures — source unreachable, authorization denied, state errors that prevent + * computation — are reported by throwing {@link BacklogReportingException}, not by returning + * {@link Optional#empty()}. This keeps failures distinguishable from "no value available." + *

+ * + *

+ * Implementation Note: This API is currently experimental, as it is under very active + * development. As such, it is subject to change without notice between releases. + *

+ */ +public interface BacklogReportingConnector { + + /** + *

+ * Reports how much data remains on the source system(s) that this Connector pulls from but + * has not yet brought into NiFi. + *

+ * + *

+ * This method is invoked against the active {@link FlowContext}, because backlog is + * a runtime property of the running flow. The working flow context is not used for backlog + * reporting. Expected call frequency is low — typically on the order of once per minute, + * sometimes once every few minutes. Implementations may cache results internally if computing + * backlog puts non-trivial load on the source, but caching is not required. + *

+ * + * @param activeFlowContext the active flow context against which to compute backlog + * @return the Connector's reported {@link Backlog}, or {@link Optional#empty()} if the Connector + * cannot determine a value right now (for example, it has never polled the source) + * @throws BacklogReportingException if the Connector attempted to determine its backlog and failed + */ + Optional getBacklog(final FlowContext activeFlowContext) throws BacklogReportingException; +} diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java b/src/main/java/org/apache/nifi/components/connector/Connector.java index 2fcafb0..eb4a22c 100644 --- a/src/main/java/org/apache/nifi/components/connector/Connector.java +++ b/src/main/java/org/apache/nifi/components/connector/Connector.java @@ -242,4 +242,5 @@ public interface Connector { * @return a Future that will be completed when the draining is complete */ CompletableFuture drainFlowFiles(FlowContext flowContext); + } diff --git a/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java index ff17d2b..79ea9b9 100644 --- a/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java +++ b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java @@ -25,6 +25,12 @@ import java.io.IOException; import java.util.function.Predicate; +/** + *

+ * Facade exposing per-Connection operations to a Connector implementation. The framework constructs + * and supplies these facades; Connector extensions do not implement this interface themselves. + *

+ */ public interface ConnectionFacade { VersionedConnection getDefinition(); @@ -49,4 +55,12 @@ public interface ConnectionFacade { */ DropFlowFileSummary dropFlowFiles(Predicate predicate) throws IOException; + /** + * Returns a read-only point-in-time snapshot of this connection's queue. See + * {@link QueueSnapshot} for the snapshot semantics. + * + * @return a non-null snapshot of the connection's queue + */ + QueueSnapshot getQueueSnapshot(); + } diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java index bb57e3f..23f4f22 100644 --- a/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessorFacade.java @@ -17,16 +17,26 @@ package org.apache.nifi.components.connector.components; +import org.apache.nifi.components.Backlog; +import org.apache.nifi.components.BacklogReportingException; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.connector.InvocationFailedException; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.processor.BacklogReportingProcessor; import java.util.List; import java.util.Map; +import java.util.Optional; +/** + *

+ * Facade exposing per-Processor operations to a Connector implementation. The framework constructs + * and supplies these facades; Connector extensions do not implement this interface themselves. + *

+ */ public interface ProcessorFacade { VersionedProcessor getDefinition(); @@ -75,4 +85,32 @@ public interface ProcessorFacade { * @throws InvocationFailedException if unable to invoke the method */ T invokeConnectorMethod(String methodName, Map arguments, Class returnType) throws InvocationFailedException; + + /** + * Indicates whether the underlying Processor implements {@link BacklogReportingProcessor}. This + * is a cheap capability check that does not call into the Processor. + * + * @return {@code true} if the underlying Processor implements {@link BacklogReportingProcessor}; + * {@code false} otherwise + */ + boolean reportsBacklog(); + + /** + *

+ * Returns the underlying Processor's reported {@link Backlog}. This is the bridge a + * Connector uses to ask a Processor in its flow how much data remains on the source. + *

+ * + *

+ * The return value and exception semantics mirror those of + * {@link BacklogReportingProcessor#getBacklog(org.apache.nifi.processor.ProcessContext)}. If + * the underlying Processor does not implement {@link BacklogReportingProcessor}, this method + * returns {@link Optional#empty()}. + *

+ * + * @return the Processor's reported {@link Backlog}, or {@link Optional#empty()} if the Processor does not + * implement {@link BacklogReportingProcessor} or has nothing to report + * @throws BacklogReportingException if the Processor attempted to determine its backlog and failed + */ + Optional getBacklog() throws BacklogReportingException; } diff --git a/src/main/java/org/apache/nifi/components/connector/components/QueueSnapshot.java b/src/main/java/org/apache/nifi/components/connector/components/QueueSnapshot.java new file mode 100644 index 0000000..da520da --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/components/QueueSnapshot.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flowfile.FlowFile; + +import java.util.List; + +/** + *

+ * A point-in-time view of a connection's queue, returned by + * {@link ConnectionFacade#getQueueSnapshot()}. Bundles the connection's total {@link QueueSize} + * together with the {@link FlowFile}s currently held in this node's active in-memory queue, + * meaning the FlowFiles resident in memory on this node and available in poll order. + *

+ * + *

+ * The active list and the {@link QueueSize} are captured atomically, so they describe the same + * point in time. The active list is not always the entire queue. FlowFiles that have been + * swapped out are excluded from {@link #getActiveFlowFiles()}. On load-balanced connections, + * FlowFiles in flight between nodes are excluded as well. Those FlowFiles are still counted in + * {@link #getQueueSize()}. Use {@link #isActiveListExhaustive()} to determine whether the + * active list contains every FlowFile counted in the queue size. + *

+ * + *

+ * {@link #getActiveFlowFiles()} returns the full active list without a caller-supplied limit, but + * the list itself is bounded by the framework's maximum active in-memory queue size. The + * snapshot does not clone the {@link FlowFile} instances; it references the existing active + * FlowFiles. + *

+ */ +public interface QueueSnapshot { + + /** + * @return the total {@link QueueSize} of the connection at the time the snapshot was taken + */ + QueueSize getQueueSize(); + + /** + * @return the active in-memory FlowFiles in poll order, never null; excludes FlowFiles that + * have been swapped out and, for load-balanced connections, FlowFiles in flight between + * nodes + */ + List getActiveFlowFiles(); + + /** + * @return {@code true} if {@link #getActiveFlowFiles()} contains every FlowFile counted in + * {@link #getQueueSize()}; otherwise {@code false} + */ + boolean isActiveListExhaustive(); +} diff --git a/src/main/java/org/apache/nifi/processor/BacklogReportingProcessor.java b/src/main/java/org/apache/nifi/processor/BacklogReportingProcessor.java new file mode 100644 index 0000000..24b3df0 --- /dev/null +++ b/src/main/java/org/apache/nifi/processor/BacklogReportingProcessor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processor; + +import org.apache.nifi.components.Backlog; +import org.apache.nifi.components.BacklogReportingException; + +import java.util.Optional; + +/** + *

+ * Optional capability interface implemented by Processors that can report a {@link Backlog} + * describing how much data remains on the source system that the Processor has not yet pulled + * into NiFi. Implementing this interface is the opt-in signal: Processors that do not implement + * it are simply not asked for backlog. + *

+ * + *

+ * The reported {@link Backlog} reflects only the data the Processor knows about from the source. + * A Connector that composes a flow-wide backlog generally also accounts for FlowFiles already + * enqueued in NiFi between processors; that composition is the Connector's responsibility, not + * the Processor's. + *

+ * + *

+ * The expected call frequency for {@link #getBacklog(ProcessContext)} is low — on the order of + * once per minute, sometimes once every few minutes. Implementations may cache results internally + * if computing backlog puts non-trivial load on the source, but caching is not required. + *

+ * + *

Return value semantics

+ *
    + *
  • + * {@link Optional#empty()} means the Processor cannot report a {@code Backlog} right now, + * even though it implements this interface. This is distinct from a hard failure. + *
  • + *
  • + * Optional.of(Backlog.caughtUp()) means the Processor is fully + * synchronized with its source as of now. + *
  • + *
  • + * Optional.of(...) with non-zero numeric dimensions means the Processor knows that + * this much data remains on the source. See {@link Backlog.Precision} for whether the + * numbers are exact or lower bounds. + *
  • + *
+ * + *

+ * Hard failures — source unreachable, authorization denied, state errors that prevent + * computation — are reported by throwing {@link BacklogReportingException}, not by returning + * {@link Optional#empty()}. This keeps failures distinguishable from "no value available." + *

+ */ +public interface BacklogReportingProcessor { + + /** + * Returns the Processor's current view of how much data remains on the source. + * + *

+ * The framework always supplies a non-null {@link ProcessContext}. Implementations may use + * the supplied context to read property values and to build whatever transient clients they + * need to query the source. Implementations are responsible for closing any transient + * clients they open during the call. + *

+ * + *

+ * The framework does not serialize calls to this method. Multiple {@code getBacklog} + * invocations may run concurrently against the same Processor instance, so implementations + * must be safe for concurrent use. Implementations that open per-call transient clients are + * naturally isolated. Implementations that cache shared clients across calls must guard + * them appropriately or rely on thread-safe SDK clients. + *

+ * + * @param context a non-null {@link ProcessContext} usable to look up property values and to + * construct transient clients to the source + * @return the Processor's reported {@link Backlog}, or {@link Optional#empty()} if the Processor + * cannot determine a value right now (e.g. it has never polled the source) + * @throws BacklogReportingException if the Processor attempted to determine its backlog and + * failed due to a transient or permanent error such as I/O failure or authorization denial + */ + Optional getBacklog(final ProcessContext context) throws BacklogReportingException; +} diff --git a/src/test/java/org/apache/nifi/components/TestBacklog.java b/src/test/java/org/apache/nifi/components/TestBacklog.java new file mode 100644 index 0000000..6cd51bf --- /dev/null +++ b/src/test/java/org/apache/nifi/components/TestBacklog.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +import org.apache.nifi.components.Backlog.Precision; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestBacklog { + + @Test + public void testPlusSumsDimensionsAndKeepsExactWhenBothSidesReportTheSameShape() { + final Backlog left = Backlog.builder().flowFiles(10L).bytes(100L).build(); + final Backlog right = Backlog.builder().flowFiles(5L).bytes(50L).build(); + + final Backlog combined = left.plus(right); + + assertEquals(OptionalLong.of(15L), combined.getFlowFileCount()); + assertEquals(OptionalLong.of(150L), combined.getByteCount()); + assertFalse(combined.getRecordCount().isPresent()); + assertEquals(Precision.EXACT, combined.getPrecision()); + } + + @Test + public void testPlusDowngradesToAtLeastWhenOneSideOmitsADimensionTheOtherReports() { + // Left knows only flowFiles; right knows only bytes. The result should still carry the known + // numeric values, but the precision must be AT_LEAST because each side is "unknown" with + // respect to the dimension the other side reported. Treating "unknown" as zero would let the + // combined Backlog falsely claim EXACT completeness. + final Backlog left = Backlog.flowFiles(10L); + final Backlog right = Backlog.bytes(100L); + + final Backlog combined = left.plus(right); + + assertEquals(OptionalLong.of(10L), combined.getFlowFileCount()); + assertEquals(OptionalLong.of(100L), combined.getByteCount()); + assertFalse(combined.getRecordCount().isPresent()); + assertEquals(Precision.AT_LEAST, combined.getPrecision()); + } + + @Test + public void testPlusDowngradesToAtLeastWhenAsymmetricEvenIfBothSidesAreExact() { + // Both sides are EXACT individually but report different sets of dimensions. The combined + // view cannot be EXACT because each side is silent about a dimension the other side knows. + final Backlog left = Backlog.builder().flowFiles(1L).bytes(2L).precision(Precision.EXACT).build(); + final Backlog right = Backlog.builder().flowFiles(3L).bytes(4L).records(5L).precision(Precision.EXACT).build(); + + final Backlog combined = left.plus(right); + + assertEquals(OptionalLong.of(4L), combined.getFlowFileCount()); + assertEquals(OptionalLong.of(6L), combined.getByteCount()); + assertEquals(OptionalLong.of(5L), combined.getRecordCount()); + assertEquals(Precision.AT_LEAST, combined.getPrecision()); + } + + @Test + public void testPlusPropagatesAtLeastFromEitherOperand() { + final Backlog left = Backlog.builder().flowFiles(1L).precision(Precision.AT_LEAST).build(); + final Backlog right = Backlog.builder().flowFiles(2L).precision(Precision.EXACT).build(); + + assertEquals(Precision.AT_LEAST, left.plus(right).getPrecision()); + assertEquals(Precision.AT_LEAST, right.plus(left).getPrecision()); + } + + @Test + public void testPlusUsesEarlierLastCaughtUpAndKeepsOnlySideWhenOtherMissing() { + final Instant earlier = Instant.parse("2025-01-01T00:00:00Z"); + final Instant later = Instant.parse("2025-01-02T00:00:00Z"); + + final Backlog withEarlier = Backlog.builder().flowFiles(0L).lastCaughtUp(earlier).build(); + final Backlog withLater = Backlog.builder().flowFiles(0L).lastCaughtUp(later).build(); + assertEquals(earlier, withEarlier.plus(withLater).getLastCaughtUp().orElseThrow()); + + final Backlog withoutTimestamp = Backlog.flowFiles(0L); + assertEquals(later, withoutTimestamp.plus(withLater).getLastCaughtUp().orElseThrow()); + } + + @Test + public void testPlusOverflowOnLongSum() { + final Backlog left = Backlog.flowFiles(Long.MAX_VALUE); + final Backlog right = Backlog.flowFiles(1L); + assertThrows(ArithmeticException.class, () -> left.plus(right)); + } + + @Test + public void testCaughtUpFactoryProducesZerosAndTimestamp() { + final Backlog caughtUp = Backlog.caughtUp(); + assertEquals(OptionalLong.of(0L), caughtUp.getFlowFileCount()); + assertEquals(OptionalLong.of(0L), caughtUp.getByteCount()); + assertEquals(OptionalLong.of(0L), caughtUp.getRecordCount()); + assertTrue(caughtUp.getLastCaughtUp().isPresent()); + assertEquals(Precision.EXACT, caughtUp.getPrecision()); + } +} From b33a3a1f349f5966ccd8af534d449159bd7c8ed5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 Jun 2026 10:46:27 -0400 Subject: [PATCH 2/2] NIFI-16045: Address backlog API review feedback Preserve exact numeric precision when combining a count-only Backlog with a timestamp-only Backlog, and add focused tests for timestamp-only and negative-value cases. Also clean up related API JavaDocs and formatting surfaced during review. --- .../org/apache/nifi/components/Backlog.java | 54 +++++++++++++------ .../nifi/components/connector/Connector.java | 14 ++--- .../apache/nifi/components/TestBacklog.java | 29 ++++++++++ 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/apache/nifi/components/Backlog.java b/src/main/java/org/apache/nifi/components/Backlog.java index 1ab31c7..29402b7 100644 --- a/src/main/java/org/apache/nifi/components/Backlog.java +++ b/src/main/java/org/apache/nifi/components/Backlog.java @@ -59,8 +59,7 @@ *

* *

- * Implementation Note: This API is currently experimental, as it is under very active - * development. As such, it is subject to change without notice between releases. + * This API is experimental and may change without notice between releases. *

*/ public final class Backlog { @@ -129,9 +128,11 @@ public Precision getPrecision() { * Numeric dimensions ({@code flowFileCount}, {@code byteCount}, {@code recordCount}): * if both sides have a value, the result is their sum. If only one side has a value, * it is carried through but the combined {@link Precision} is downgraded to - * {@link Precision#AT_LEAST} (see the precision rule below), because the side that did - * not report the dimension is unknown rather than zero. If neither side has a value, - * the field stays empty. + * {@link Precision#AT_LEAST} (see the precision rule below) when the other side reports + * at least one numeric dimension, because the side that did not report the dimension is + * unknown rather than zero. If the other side reports no numeric dimensions at all, the + * known value is carried through without adding numeric uncertainty. If neither side has + * a value, the field stays empty. * *
  • * {@link #getLastCaughtUp() lastCaughtUp}: not summable. If both sides have a value, @@ -141,10 +142,11 @@ public Precision getPrecision() { *
  • *
  • * {@link #getPrecision() precision}: the result is {@link Precision#EXACT} only when both - * sides are {@code EXACT} and both sides report the same set of populated numeric - * dimensions. Otherwise the result is {@link Precision#AT_LEAST}. Any uncertainty in either - * operand — including a missing dimension on one side that the other side reported — - * taints the result, because "unknown" must not be treated as zero. + * sides are {@code EXACT} and either both sides report the same set of populated + * numeric dimensions or one side reports no numeric dimensions at all. Otherwise the + * result is {@link Precision#AT_LEAST}. Any uncertainty in either operand — including a + * missing dimension on one side that the other side reported — taints the result, + * because "unknown" must not be treated as zero. *
  • * * @@ -160,25 +162,35 @@ public Backlog plus(final Backlog other) { final OptionalLong sumRecords = sumOptional(recordCount, other.recordCount); final Optional earliestCaughtUp = earlierOf(lastCaughtUp, other.lastCaughtUp); + final boolean reportsNumericDimensions = flowFileCount.isPresent() || byteCount.isPresent() || recordCount.isPresent(); + final boolean otherReportsNumericDimensions = other.flowFileCount.isPresent() || other.byteCount.isPresent() || other.recordCount.isPresent(); + // A dimension that one side reports and the other does not is "unknown" on the omitting side, // not zero. Carrying the known value forward and still reporting EXACT would let the result - // claim completeness it cannot back up, so any such asymmetry forces AT_LEAST. - final boolean dimensionsAsymmetric = flowFileCount.isPresent() != other.flowFileCount.isPresent() + // claim completeness it cannot back up, so any such asymmetry forces AT_LEAST. However, a + // Backlog with no numeric dimensions contributes only non-numeric information, such as a + // lastCaughtUp timestamp, and therefore does not make numeric counts less exact. + final boolean dimensionsAsymmetric = reportsNumericDimensions && otherReportsNumericDimensions + && (flowFileCount.isPresent() != other.flowFileCount.isPresent() || byteCount.isPresent() != other.byteCount.isPresent() - || recordCount.isPresent() != other.recordCount.isPresent(); - final boolean bothExact = precision == Precision.EXACT && other.precision == Precision.EXACT; - final Precision combinedPrecision = (bothExact && !dimensionsAsymmetric) ? Precision.EXACT : Precision.AT_LEAST; + || recordCount.isPresent() != other.recordCount.isPresent()); + final boolean precisionExact = precision == Precision.EXACT || !reportsNumericDimensions; + final boolean otherPrecisionExact = other.precision == Precision.EXACT || !otherReportsNumericDimensions; + final Precision combinedPrecision = (precisionExact && otherPrecisionExact && !dimensionsAsymmetric) ? Precision.EXACT : Precision.AT_LEAST; final Builder builder = new Builder().precision(combinedPrecision); if (sumFlowFiles.isPresent()) { builder.flowFiles(sumFlowFiles.getAsLong()); } + if (sumBytes.isPresent()) { builder.bytes(sumBytes.getAsLong()); } + if (sumRecords.isPresent()) { builder.records(sumRecords.getAsLong()); } + earliestCaughtUp.ifPresent(builder::lastCaughtUp); return builder.build(); } @@ -187,12 +199,15 @@ private static OptionalLong sumOptional(final OptionalLong left, final OptionalL if (left.isPresent() && right.isPresent()) { return OptionalLong.of(Math.addExact(left.getAsLong(), right.getAsLong())); } + if (left.isPresent()) { return left; } + if (right.isPresent()) { return right; } + return OptionalLong.empty(); } @@ -200,9 +215,11 @@ private static Optional earlierOf(final Optional left, final O if (left.isPresent() && right.isPresent()) { return left.get().isBefore(right.get()) ? left : right; } + if (left.isPresent()) { return left; } + return right; } @@ -237,10 +254,11 @@ public static Backlog records(final long count) { } /** - * Creates a Backlog whose only populated dimension is the {@code lastCaughtUp} timestamp. - * Useful for combining with a count-only Backlog via {@link #plus(Backlog)}. + * Creates a Backlog populated with the provided {@code lastCaughtUp} timestamp. Useful for + * combining with a count-only Backlog via {@link #plus(Backlog)}. * - * @param instant the moment at which the component was last observed as fully caught up + * @param instant the moment at which the component was last observed as fully caught up; may be + * null, in which case the returned Backlog has no populated dimensions * @return a new Backlog */ public static Backlog lastCaughtUp(final Instant instant) { @@ -280,9 +298,11 @@ public boolean equals(final Object object) { if (this == object) { return true; } + if (!(object instanceof Backlog)) { return false; } + final Backlog other = (Backlog) object; return Objects.equals(flowFileCount, other.flowFileCount) && Objects.equals(byteCount, other.byteCount) diff --git a/src/main/java/org/apache/nifi/components/connector/Connector.java b/src/main/java/org/apache/nifi/components/connector/Connector.java index eb4a22c..36d2998 100644 --- a/src/main/java/org/apache/nifi/components/connector/Connector.java +++ b/src/main/java/org/apache/nifi/components/connector/Connector.java @@ -32,7 +32,7 @@ * A Connector is a component that encapsulates and manages a NiFi flow, in such a way that the flow * can be treated as a single component. The Connector is responsible for managing the lifecycle of the flow, * including starting and stopping the flow, as well as validating that the flow is correctly configured. - * The Connector exposes a single holistic configuration that is encapsulates the configuration of the + * The Connector exposes a single holistic configuration that encapsulates the configuration of the * sources, sinks, transformations, routing logic, and any other components that make up the flow. *

    * @@ -117,7 +117,7 @@ public interface Connector { /** * Validates that the Connector is valid according to its current configuration. Validity of a Connector may be - * defined simply as the all components being valid, or it may encompass more complex validation logic, such + * defined simply as all components being valid, or it may encompass more complex validation logic, such * as ensuring that a Source Processor is able to connect to a remote system, or that a Sink Processor * is able to write to a remote system. * @@ -131,7 +131,7 @@ public interface Connector { /** * Validates the configuration for a specific configuration step. This allows the Connector to indicate any - * issues with syntactic configuration issues but is not as comprehensive as the overall validation provided + * syntax issues in the configuration but is not as comprehensive as the overall validation provided * by {@link #validate(FlowContext, ConnectorValidationContext)} due to the fact that it does not have access * to the full configuration of the Connector. This provides immediate feedback to users * as they are configuring each step. @@ -146,8 +146,8 @@ public interface Connector { /** * Verifies the configuration for a specific configuration step. This allows the Connector to perform - * more comprehensive verification of the configuration for a step than does validation, such as attempting to connect to - * remote systems, sample data and ensure that it can be parsed correctly, etc. + * more comprehensive verification of the configuration for a step than does validation, such as connecting to + * remote systems or sampling data and ensuring that it can be parsed correctly. * * @param stepName the name of the configuration step being verified * @param propertyValueOverrides any overrides to the currently configured property values that should be used for verification @@ -181,6 +181,7 @@ public interface Connector { * * @param stepName the name of the step * @param workingFlowContext the working flow context that is being used for the update + * @throws FlowUpdateException if there is an error updating the flow for the configured step */ void onConfigurationStepConfigured(String stepName, FlowContext workingFlowContext) throws FlowUpdateException; @@ -190,6 +191,7 @@ public interface Connector { * * @param workingFlowContext the working flow context that has been created for the update * @param activeFlowContext the active flow context that is currently in use + * @throws FlowUpdateException if the Connector cannot prepare the flow for the update */ void prepareForUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException; @@ -210,6 +212,7 @@ public interface Connector { * * @param workingFlowContext the working flow context that represents the updated configuration * @param activeFlowContext the flow context that represents the active flow + * @throws FlowUpdateException if the updated configuration cannot be applied to the active flow */ void applyUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException; @@ -242,5 +245,4 @@ public interface Connector { * @return a Future that will be completed when the draining is complete */ CompletableFuture drainFlowFiles(FlowContext flowContext); - } diff --git a/src/test/java/org/apache/nifi/components/TestBacklog.java b/src/test/java/org/apache/nifi/components/TestBacklog.java index 6cd51bf..98b514e 100644 --- a/src/test/java/org/apache/nifi/components/TestBacklog.java +++ b/src/test/java/org/apache/nifi/components/TestBacklog.java @@ -83,6 +83,23 @@ public void testPlusPropagatesAtLeastFromEitherOperand() { assertEquals(Precision.AT_LEAST, right.plus(left).getPrecision()); } + @Test + public void testPlusKeepsExactWhenCombinedWithTimestampOnlyBacklog() { + final Instant lastCaughtUp = Instant.parse("2025-01-01T00:00:00Z"); + final Backlog countOnly = Backlog.flowFiles(10L); + final Backlog timestampOnly = Backlog.lastCaughtUp(lastCaughtUp); + + final Backlog combinedWithTimestampOnRight = countOnly.plus(timestampOnly); + assertEquals(OptionalLong.of(10L), combinedWithTimestampOnRight.getFlowFileCount()); + assertEquals(Precision.EXACT, combinedWithTimestampOnRight.getPrecision()); + assertEquals(lastCaughtUp, combinedWithTimestampOnRight.getLastCaughtUp().orElseThrow()); + + final Backlog combinedWithTimestampOnLeft = timestampOnly.plus(countOnly); + assertEquals(OptionalLong.of(10L), combinedWithTimestampOnLeft.getFlowFileCount()); + assertEquals(Precision.EXACT, combinedWithTimestampOnLeft.getPrecision()); + assertEquals(lastCaughtUp, combinedWithTimestampOnLeft.getLastCaughtUp().orElseThrow()); + } + @Test public void testPlusUsesEarlierLastCaughtUpAndKeepsOnlySideWhenOtherMissing() { final Instant earlier = Instant.parse("2025-01-01T00:00:00Z"); @@ -112,4 +129,16 @@ public void testCaughtUpFactoryProducesZerosAndTimestamp() { assertTrue(caughtUp.getLastCaughtUp().isPresent()); assertEquals(Precision.EXACT, caughtUp.getPrecision()); } + + @Test + public void testBuilderRejectsNegativeNumericDimensions() { + final IllegalArgumentException flowFilesException = assertThrows(IllegalArgumentException.class, () -> Backlog.builder().flowFiles(-1L)); + assertEquals("flowFiles must be non-negative but was -1", flowFilesException.getMessage()); + + final IllegalArgumentException bytesException = assertThrows(IllegalArgumentException.class, () -> Backlog.builder().bytes(-1L)); + assertEquals("bytes must be non-negative but was -1", bytesException.getMessage()); + + final IllegalArgumentException recordsException = assertThrows(IllegalArgumentException.class, () -> Backlog.builder().records(-1L)); + assertEquals("records must be non-negative but was -1", recordsException.getMessage()); + } }