Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
437 changes: 437 additions & 0 deletions src/main/java/org/apache/nifi/components/Backlog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components;

/**
* <p>
* Thrown by a Connector or Processor that supports backlog reporting but cannot
* determine its current backlog due to a transient or permanent failure. Typical causes
* include I/O errors reaching the source system, authorization failures (the component is
* allowed to consume data but is not authorized to call the offset or listing API), and
* state errors that prevent computation.
* </p>
*
* <p>
* This exception is distinct from a Connector or Processor not supporting backlog reporting
* at all. Components that do not support backlog reporting simply do not implement
* {@link org.apache.nifi.processor.BacklogReportingProcessor} or
* {@link org.apache.nifi.components.connector.BacklogReportingConnector}. Components that do
* support backlog reporting may return {@link java.util.Optional#empty()} when they cannot
* determine a value right now, or throw this exception when an attempt to determine the
* backlog fails. Callers should surface the cause rather than silently treat the failure as
* "not supported".
* </p>
*/
public class BacklogReportingException extends Exception {

public BacklogReportingException(final String message) {
super(message);
}

public BacklogReportingException(final String message, final Throwable cause) {
super(message, cause);
}

public BacklogReportingException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

import org.apache.nifi.components.Backlog;
import org.apache.nifi.components.BacklogReportingException;
import org.apache.nifi.components.connector.components.FlowContext;
import org.apache.nifi.components.connector.components.ProcessorFacade;

import java.util.Optional;

/**
* <p>
* Optional capability interface implemented by {@link Connector}s that can report a
* {@link Backlog} describing how much data remains on the source system(s) the Connector
* pulls from but has not yet brought into NiFi. Implementing this interface is the opt-in
* signal: Connectors that do not implement it are simply not asked for backlog, and the
* framework's REST endpoints return HTTP {@code 409 Conflict} rather than treating "not
* supported" the same as "supported but currently unknown".
* </p>
*
* <p>
* This mirrors the {@code BacklogReportingProcessor} capability interface on the Processor
* side: the {@code Connector} base interface stays focused on lifecycle and configuration,
* and backlog reporting is layered on as a separate opt-in.
* </p>
*
* <p>
* Implementations that do report backlog typically derive the value from a single source —
* most often a single Processor's {@code BacklogReportingProcessor} report exposed via
* {@link ProcessorFacade#getBacklog()}. Some Connectors may compose a flow-wide picture from
* multiple sources: for example, summing the {@link Backlog} from multiple Processors, or, for
* a List/Fetch pattern, also inspecting queues and FlowFile attributes to account for data that
* has been "Listed" but not yet "Fetched".
* </p>
*
* <h2>Return value semantics</h2>
* <ul>
* <li>
* {@link Optional#empty()} means the Connector cannot report a {@link Backlog} right now,
* even though it implements this interface. This is distinct from a hard failure.
* </li>
* <li>
* <code>Optional.of(Backlog.caughtUp())</code> means the Connector is fully synchronized
* with its source as of now.
* </li>
* <li>
* <code>Optional.of(...)</code> with non-zero numeric dimensions means the Connector knows
* that this much data remains on the source. See {@link Backlog.Precision} for whether the
* numbers are exact or lower bounds.
* </li>
* </ul>
*
* <p>
* Hard failures — source unreachable, authorization denied, state errors that prevent
* computation — are reported by throwing {@link BacklogReportingException}, not by returning
* {@link Optional#empty()}. This keeps failures distinguishable from "no value available."
* </p>
*
* <p>
* <b>Implementation Note:</b> This API is currently experimental, as it is under very active
* development. As such, it is subject to change without notice between releases.
* </p>
*/
public interface BacklogReportingConnector {

/**
* <p>
* Reports how much data remains on the source system(s) that this Connector pulls from but
* has not yet brought into NiFi.
* </p>
*
* <p>
* This method is invoked against the <b>active</b> {@link FlowContext}, because backlog is
* a runtime property of the running flow. The working flow context is not used for backlog
* reporting. Expected call frequency is low — typically on the order of once per minute,
* sometimes once every few minutes. Implementations may cache results internally if computing
* backlog puts non-trivial load on the source, but caching is not required.
* </p>
*
* @param activeFlowContext the active flow context against which to compute backlog
* @return the Connector's reported {@link Backlog}, or {@link Optional#empty()} if the Connector
* cannot determine a value right now (for example, it has never polled the source)
* @throws BacklogReportingException if the Connector attempted to determine its backlog and failed
*/
Optional<Backlog> getBacklog(final FlowContext activeFlowContext) throws BacklogReportingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* A Connector is a component that encapsulates and manages a NiFi flow, in such a way that the flow
* can be treated as a single component. The Connector is responsible for managing the lifecycle of the flow,
* including starting and stopping the flow, as well as validating that the flow is correctly configured.
* The Connector exposes a single holistic configuration that is encapsulates the configuration of the
* The Connector exposes a single holistic configuration that encapsulates the configuration of the
* sources, sinks, transformations, routing logic, and any other components that make up the flow.
* </p>
*
Expand Down Expand Up @@ -117,7 +117,7 @@ public interface Connector {

/**
* Validates that the Connector is valid according to its current configuration. Validity of a Connector may be
* defined simply as the all components being valid, or it may encompass more complex validation logic, such
* defined simply as all components being valid, or it may encompass more complex validation logic, such
* as ensuring that a Source Processor is able to connect to a remote system, or that a Sink Processor
* is able to write to a remote system.
*
Expand All @@ -131,7 +131,7 @@ public interface Connector {

/**
* Validates the configuration for a specific configuration step. This allows the Connector to indicate any
* issues with syntactic configuration issues but is not as comprehensive as the overall validation provided
* syntax issues in the configuration but is not as comprehensive as the overall validation provided
* by {@link #validate(FlowContext, ConnectorValidationContext)} due to the fact that it does not have access
* to the full configuration of the Connector. This provides immediate feedback to users
* as they are configuring each step.
Expand All @@ -146,8 +146,8 @@ public interface Connector {

/**
* Verifies the configuration for a specific configuration step. This allows the Connector to perform
* more comprehensive verification of the configuration for a step than does validation, such as attempting to connect to
* remote systems, sample data and ensure that it can be parsed correctly, etc.
* more comprehensive verification of the configuration for a step than does validation, such as connecting to
* remote systems or sampling data and ensuring that it can be parsed correctly.
*
* @param stepName the name of the configuration step being verified
* @param propertyValueOverrides any overrides to the currently configured property values that should be used for verification
Expand Down Expand Up @@ -181,6 +181,7 @@ public interface Connector {
*
* @param stepName the name of the step
* @param workingFlowContext the working flow context that is being used for the update
* @throws FlowUpdateException if there is an error updating the flow for the configured step
*/
void onConfigurationStepConfigured(String stepName, FlowContext workingFlowContext) throws FlowUpdateException;

Expand All @@ -190,6 +191,7 @@ public interface Connector {
*
* @param workingFlowContext the working flow context that has been created for the update
* @param activeFlowContext the active flow context that is currently in use
* @throws FlowUpdateException if the Connector cannot prepare the flow for the update
*/
void prepareForUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException;

Expand All @@ -210,6 +212,7 @@ public interface Connector {
*
* @param workingFlowContext the working flow context that represents the updated configuration
* @param activeFlowContext the flow context that represents the active flow
* @throws FlowUpdateException if the updated configuration cannot be applied to the active flow
*/
void applyUpdate(FlowContext workingFlowContext, FlowContext activeFlowContext) throws FlowUpdateException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
import java.io.IOException;
import java.util.function.Predicate;

/**
* <p>
* Facade exposing per-Connection operations to a Connector implementation. The framework constructs
* and supplies these facades; Connector extensions do not implement this interface themselves.
* </p>
*/
public interface ConnectionFacade {

VersionedConnection getDefinition();
Expand All @@ -49,4 +55,12 @@ public interface ConnectionFacade {
*/
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;

/**
* Returns a read-only point-in-time snapshot of this connection's queue. See
* {@link QueueSnapshot} for the snapshot semantics.
*
* @return a non-null snapshot of the connection's queue
*/
QueueSnapshot getQueueSnapshot();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@

package org.apache.nifi.components.connector.components;

import org.apache.nifi.components.Backlog;
import org.apache.nifi.components.BacklogReportingException;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.connector.InvocationFailedException;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.processor.BacklogReportingProcessor;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* <p>
* Facade exposing per-Processor operations to a Connector implementation. The framework constructs
* and supplies these facades; Connector extensions do not implement this interface themselves.
* </p>
*/
public interface ProcessorFacade {

VersionedProcessor getDefinition();
Expand Down Expand Up @@ -75,4 +85,32 @@ public interface ProcessorFacade {
* @throws InvocationFailedException if unable to invoke the method
*/
<T> T invokeConnectorMethod(String methodName, Map<String, Object> arguments, Class<T> returnType) throws InvocationFailedException;

/**
* Indicates whether the underlying Processor implements {@link BacklogReportingProcessor}. This
* is a cheap capability check that does not call into the Processor.
*
* @return {@code true} if the underlying Processor implements {@link BacklogReportingProcessor};
* {@code false} otherwise
*/
boolean reportsBacklog();

/**
* <p>
* Returns the underlying Processor's reported {@link Backlog}. This is the bridge a
* Connector uses to ask a Processor in its flow how much data remains on the source.
* </p>
*
* <p>
* The return value and exception semantics mirror those of
* {@link BacklogReportingProcessor#getBacklog(org.apache.nifi.processor.ProcessContext)}. If
* the underlying Processor does not implement {@link BacklogReportingProcessor}, this method
* returns {@link Optional#empty()}.
* </p>
*
* @return the Processor's reported {@link Backlog}, or {@link Optional#empty()} if the Processor does not
* implement {@link BacklogReportingProcessor} or has nothing to report
* @throws BacklogReportingException if the Processor attempted to determine its backlog and failed
*/
Optional<Backlog> getBacklog() throws BacklogReportingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector.components;

import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;

import java.util.List;

/**
* <p>
* A point-in-time view of a connection's queue, returned by
* {@link ConnectionFacade#getQueueSnapshot()}. Bundles the connection's total {@link QueueSize}
* together with the {@link FlowFile}s currently held in this node's active in-memory queue,
* meaning the FlowFiles resident in memory on this node and available in poll order.
* </p>
*
* <p>
* The active list and the {@link QueueSize} are captured atomically, so they describe the same
* point in time. The active list is not always the entire queue. FlowFiles that have been
* swapped out are excluded from {@link #getActiveFlowFiles()}. On load-balanced connections,
* FlowFiles in flight between nodes are excluded as well. Those FlowFiles are still counted in
* {@link #getQueueSize()}. Use {@link #isActiveListExhaustive()} to determine whether the
* active list contains every FlowFile counted in the queue size.
* </p>
*
* <p>
* {@link #getActiveFlowFiles()} returns the full active list without a caller-supplied limit, but
* the list itself is bounded by the framework's maximum active in-memory queue size. The
* snapshot does not clone the {@link FlowFile} instances; it references the existing active
* FlowFiles.
* </p>
*/
public interface QueueSnapshot {

/**
* @return the total {@link QueueSize} of the connection at the time the snapshot was taken
*/
QueueSize getQueueSize();

/**
* @return the active in-memory FlowFiles in poll order, never null; excludes FlowFiles that
* have been swapped out and, for load-balanced connections, FlowFiles in flight between
* nodes
*/
List<FlowFile> getActiveFlowFiles();

/**
* @return {@code true} if {@link #getActiveFlowFiles()} contains every FlowFile counted in
* {@link #getQueueSize()}; otherwise {@code false}
*/
boolean isActiveListExhaustive();
}
Loading
Loading