Skip to content

NIFI-15932: Implemented ability to migrate a Versioned flow's Assets …#11240

Draft
markap14 wants to merge 1 commit into
apache:mainfrom
markap14:nifi-versioned-flow-connector-migration
Draft

NIFI-15932: Implemented ability to migrate a Versioned flow's Assets …#11240
markap14 wants to merge 1 commit into
apache:mainfrom
markap14:nifi-versioned-flow-connector-migration

Conversation

@markap14

Copy link
Copy Markdown
Contributor

…and config to a new Connector

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@markap14 markap14 force-pushed the nifi-versioned-flow-connector-migration branch from 935524f to fec0aba Compare May 28, 2026 16:37
@markap14 markap14 force-pushed the nifi-versioned-flow-connector-migration branch 2 times, most recently from 857d395 to d698fbf Compare June 8, 2026 19:35
@markap14 markap14 force-pushed the nifi-versioned-flow-connector-migration branch from d698fbf to 38258da Compare June 18, 2026 18:27
…and config to a new Connector

Squashed work:
- Implemented ability to migrate a Versioned flow's Assets and config to a new Connector
- Refactored the Connector Migration logic into a separate MigratableConnector interface
- Added MIGRATE allowable action and aligned with nifi-api
- Reworked to use migrateConfiguration/migrateState approach (NIFI-15988); bug fixes; include Process Groups acceptable to the Connector but not in desirable state with reasons
@markap14 markap14 force-pushed the nifi-versioned-flow-connector-migration branch from 38258da to b7f1e4a Compare June 18, 2026 18:38

final Date clientLastUpdated = clientRequest.getLastUpdated();
final Date nodeLastUpdated = nodeRequest.getLastUpdated();
if (nodeLastUpdated != null && (clientLastUpdated == null || nodeLastUpdated.before(clientLastUpdated))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition uses nodeLastUpdated.before(clientLastUpdated), which means the merged lastUpdated will reflect the earliest timestamp across nodes. The neighboring merges (setComplete = AND, setPercentCompleted = min) are "worst-of" for completion progress, but for an activity timestamp a polling client would typically expect the most recent update across the cluster — otherwise the merged value can appear to go backward as more nodes report in.

Is the intent here to surface the oldest timestamp (and if so, what does that represent for the client), or should this be nodeLastUpdated.after(clientLastUpdated)? MigrationRequestEndpointMergerTest doesn't currently assert anything about the merged lastUpdated, so either direction would pass today — happy to suggest a test once the intent is confirmed.

final long elapsedSeconds = Duration.ofNanos(System.nanoTime() - startNanos).toSeconds();
logger.warn("Failed to retrieve cluster state for {} while waiting for update completion; elapsed time = {} secs", connector, elapsedSeconds, e);
Thread.sleep(Duration.ofSeconds(1));
continue;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new catch (IOException) block logs and continues, but the only place iterations is incremented (and therefore the only termination opportunity short of a non-IOException state outcome) is the allowableStates branch below — which isn't reached if requestReplicator.getState(...) keeps throwing.

If the request replicator is persistently unhealthy, would this loop ever exit, or would the caller block indefinitely? Was an upper bound intended here — e.g. using startNanos to fail after a max duration, or bailing out after N consecutive IOExceptions? Happy to be wrong if there's an outer cancellation/timeout I'm not seeing.

}

private void clearConnectorComponentState(final ConnectorNode connector) {
// Migration is only permitted on a Connector whose active flow is empty, so every Processor and Controller

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here states "Migration is only permitted on a Connector whose active flow is empty", but the precondition check verifyTargetIsAtInitialFlow (~L603) only asserts matchesInitialFlow() — which is true whenever the live managed flow equals getInitialFlow(), including the case where the initial flow itself declares processors or controller services.

If a MigratableConnector implementation ever ships with a non-empty getInitialFlow() containing stateful components, would a Phase 2 failure here wipe state belonging to those originally-declared components? Is the empty-initial-flow assumption something we want to:

  1. tighten in verifyTargetIsAtInitialFlow (also assert the managed flow is structurally empty),
  2. track explicitly (snapshot pre-migration component IDs and only clear "new since snapshot"), or
  3. document as a contract requirement on MigratableConnector / the developer guide?

If today's expectation is "(3) by convention", a one-line note in the developer guide and a code comment here would make the contract clear to future implementers.

},
() -> {
if (hasLocalSource) {
serviceFacade.verifyCanMigrateConnector(connectorId, request.getLocalSource().getProcessGroupId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two related questions about concurrency on this submission path:

  1. withWriteLock is revision-based and the verifier runs synchronously, but the actual migration is dispatched to migrationRequestManager and runs asynchronously. Is there a per-Connector serialization mechanism I'm missing that prevents two near-simultaneous submissions against the same target Connector from both passing verifyTargetIsAtInitialFlow and then racing in applyMigratedConfiguration? If not, would a per-Connector lock at the manager level — or introducing an explicit MIGRATING ConnectorState to gate transitions — be worth considering? The asymmetric-failure cluster IT exercises per-node divergence but doesn't appear to exercise two parallel requests against the same Connector.
  2. The verifyCanMigrateConnector call is intentionally guarded by hasLocalSource, so for uploaded-payload sources the structural / matchesInitialFlow checks only fire inside the async task and surface several poll cycles later. Was that intentional? Would it be reasonable to run an equivalent synchronous verify (Connector state + matchesInitialFlow) for the uploaded-payload case too, so the user sees the failure on submission rather than after polling?

.mapControllerServiceReferencesToVersionedId(true)
.mapFlowRegistryClientId(false)
.mapAssetReferences(false)
.mapAssetReferences(true)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flipping mapAssetReferences from falsetrue on the shared getCurrentFlowSnapshotByGroupId mapping options changes the result for every caller of this path, not just the migration source-flow capture — e.g. process-group export and any other consumer of buildFlowSnapshot via this method.

Is the intent to start including asset references in those snapshots universally, or is the change really only meant for migration? If the latter, would it make sense to route the migration-source capture through a dedicated method (or add an opt-in parameter), so the existing export consumers don't inadvertently start emitting NiFi-internal asset identifiers that may not resolve on a different instance?


final RegisteredFlowSnapshot flowSnapshot;
try {
flowSnapshot = OBJECT_MAPPER.readValue(payloadContents, RegisteredFlowSnapshot.class);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload upload reads the JSON in full via OBJECT_MAPPER.readValue(payloadContents, ...) and pins the deserialized snapshot in migrationPayloadsById for up to ~10 minutes, but there's no MaxLengthInputStream wrap or other size cap on the upload. The asset upload path in this same file caps at MAX_ASSET_SIZE_BYTES = 1 GB for comparable resource reasons.

Is the expectation that the 10-min TTL + per-Connector WRITE authorization is enough bounding, or would a MaxLengthInputStream wrap here be worth adding (probably with a tighter cap than the 1 GB asset limit, since exported flows are typically a couple orders of magnitude smaller)?

// Clean up assets after configuration has been inherited so that the Connector's flow contexts reflect the
// synchronized configuration; performing this before synchronization would treat still-referenced assets as
// unreferenced because the managed flow has not yet been populated.
cleanUpAssets(connector);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanUpAssets(connector) is now invoked on every successful syncConnector(...) (a broader trigger than before this PR), and together with the new collectReferencedParameterContextAssetIds it's necessary for asset rollback to behave correctly after a failed migration. I want to confirm the lifecycle relationship to migration: is syncConnector(...) guaranteed never to run concurrently with an in-flight applyMigratedConfiguration(...) on the same Connector?

If a sync ever interleaved with the migration's mid-Phase-2 state — after the new working configuration is built but before it's committed — could cleanUpAssets see the newly-copied assets as unreferenced and delete them? Even if that interleaving isn't reachable today, a one-line comment locking the invariant in place (or a targeted test) would make this much harder to regress.

actionDetails.setPreviousValue(null);
actionDetails.setValue(processGroupId == null ? "Uploaded payload" : processGroupId);

final Action action = generateAuditRecord(connector, Operation.Configure, actionDetails);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migration is recorded as Operation.Configure, but semantically it's distinct from an ordinary configuration change — it can copy assets, write component state, install a managed Process Group, disable and rename the source PG, and is effectively irreversible. Operators filtering the audit log for migration events would currently have to look for Operation.Configure rows whose details name starts with "Connector migrated from…".

Would it be reasonable to introduce a dedicated Operation.Migrate (alongside the existing operations) and use it here, so migrations surface as a first-class event type in the audit trail?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants