diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index b7ab854eb98a8..99e1b3c036737 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -1819,6 +1819,12 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -5102,6 +5108,12 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -5743,6 +5755,12 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -5873,6 +5891,12 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -6251,6 +6275,12 @@ }, "write-records-complete" : { "type" : "boolean" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index ada573ef5df9c..58570eda5360a 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -2469,6 +2469,11 @@ components: format: int64 write-records-complete: type: boolean + write-records-per-target: + type: object + additionalProperties: + type: integer + format: int64 Id: type: string enum: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index f282db00ea979..8b08f9b37b5fc 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1434,6 +1434,12 @@ }, "accumulated-busy-time" : { "type" : "number" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } } @@ -3942,6 +3948,12 @@ }, "accumulated-busy-time" : { "type" : "number" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -4402,6 +4414,12 @@ }, "accumulated-busy-time" : { "type" : "number" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -4511,6 +4529,12 @@ }, "accumulated-busy-time" : { "type" : "number" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, @@ -4766,6 +4790,12 @@ }, "accumulated-busy-time" : { "type" : "number" + }, + "write-records-per-target" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } } } }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index a1301f604dc4e..06e9c87aa249b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1774,6 +1774,8 @@ private void updateAccumulatorsAndMetrics( if (metrics != null) { // Drop IOMetrics#resultPartitionBytes because it will not be used anymore. It can // result in very high memory usage when there are many executions and sub-partitions. + // Preserve the per-downstream-target numRecordsOut breakdown so that REST consumers + // (e.g. the Kubernetes autoscaler) see it on terminal executions. this.ioMetrics = new IOMetrics( metrics.getNumBytesIn(), @@ -1782,7 +1784,9 @@ private void updateAccumulatorsAndMetrics( metrics.getNumRecordsOut(), metrics.getAccumulateIdleTime(), metrics.getAccumulateBusyTime(), - metrics.getAccumulateBackPressuredTime()); + metrics.getAccumulateBackPressuredTime(), + null, + metrics.getNumRecordsOutPerTarget()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index 5dd24dab56346..3eee7d1e7223f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -51,6 +51,13 @@ public class IOMetrics implements Serializable { @Nullable protected Map resultPartitionBytes; + /** + * Per-downstream-target breakdown of {@link #numRecordsOut}, keyed by target {@code + * JobVertexID} (hex string). May be empty when the task has no outputs for which a target was + * registered (e.g. sinks, or the broadcast fan-out path). Never {@code null}. + */ + protected Map numRecordsOutPerTarget; + public IOMetrics( Meter recordsIn, Meter recordsOut, @@ -60,7 +67,8 @@ public IOMetrics( Gauge accumulatedIdleTime, Gauge accumulatedBusyTime, Map - resultPartitionBytesCounters) { + resultPartitionBytesCounters, + Map numRecordsOutPerTarget) { this.numRecordsIn = recordsIn.getCount(); this.numRecordsOut = recordsOut.getCount(); this.numBytesIn = bytesIn.getCount(); @@ -74,6 +82,10 @@ public IOMetrics( Collectors.toMap( Map.Entry::getKey, entry -> entry.getValue().createSnapshot())); + this.numRecordsOutPerTarget = + numRecordsOutPerTarget == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(numRecordsOutPerTarget); } public IOMetrics( @@ -106,6 +118,29 @@ public IOMetrics( long accumulateBackPressuredTime, @Nullable Map resultPartitionBytes) { + this( + numBytesIn, + numBytesOut, + numRecordsIn, + numRecordsOut, + accumulateIdleTime, + accumulateBusyTime, + accumulateBackPressuredTime, + resultPartitionBytes, + null); + } + + @VisibleForTesting + public IOMetrics( + long numBytesIn, + long numBytesOut, + long numRecordsIn, + long numRecordsOut, + long accumulateIdleTime, + double accumulateBusyTime, + long accumulateBackPressuredTime, + @Nullable Map resultPartitionBytes, + @Nullable Map numRecordsOutPerTarget) { this.numBytesIn = numBytesIn; this.numBytesOut = numBytesOut; this.numRecordsIn = numRecordsIn; @@ -114,6 +149,10 @@ public IOMetrics( this.accumulateBusyTime = accumulateBusyTime; this.accumulateBackPressuredTime = accumulateBackPressuredTime; this.resultPartitionBytes = resultPartitionBytes; + this.numRecordsOutPerTarget = + numRecordsOutPerTarget == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(numRecordsOutPerTarget); } public long getNumRecordsIn() { @@ -147,4 +186,15 @@ public long getAccumulateIdleTime() { public Map getResultPartitionBytes() { return Collections.unmodifiableMap(checkNotNull(resultPartitionBytes)); } + + /** + * Returns the per-downstream-target {@code numRecordsOut} breakdown, keyed by target {@code + * JobVertexID} (hex string). Never {@code null}; returns an empty map when no per-target + * counters were registered. + */ + public Map getNumRecordsOutPerTarget() { + return numRecordsOutPerTarget == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(numRecordsOutPerTarget); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index 4301808e58759..89570b433e8f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -29,6 +29,19 @@ private MetricNames() {} public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE; public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE; + /** + * Prefix used to expose the per-downstream-target breakdown of {@link #IO_NUM_RECORDS_OUT}. The + * full metric name is constructed via {@link #ioNumRecordsOutPerTargetName(String)}. This is + * intended to be consumed by tools (e.g. the Kubernetes autoscaler) that need to compute + * accurate edge data rates for vertices with multiple downstream outputs. + */ + public static final String IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX = IO_NUM_RECORDS_OUT + "."; + + /** Returns the per-target metric name for the given downstream {@code JobVertexID}. */ + public static String ioNumRecordsOutPerTargetName(String targetJobVertexId) { + return IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX + targetJobVertexId; + } + public static final String IO_NUM_BYTES_IN = "numBytesIn"; public static final String IO_NUM_BYTES_OUT = "numBytesOut"; public static final String IO_NUM_BYTES_IN_RATE = IO_NUM_BYTES_IN + SUFFIX_RATE; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 5a034f6b4e40f..bf841a8de2449 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -186,7 +186,8 @@ public IOMetrics createSnapshot() { accumulatedBackPressuredTime, accumulatedIdleTime, accumulatedBusyTime, - resultPartitionBytes); + resultPartitionBytes, + numRecordsOut.getCounters()); } // ============================================================================================ @@ -209,6 +210,17 @@ public Counter getNumRecordsOutCounter() { return numRecordsOut; } + /** + * Returns a snapshot of the per-downstream-target {@code numRecordsOut} counts, keyed by target + * {@code JobVertexID} (hex string). Only contains entries for outputs registered via {@link + * #reuseRecordsOutputCounter(Counter, String)}. The sum of the returned values may be less than + * {@link #getNumRecordsOutCounter()} when some outputs are registered without a target key + * (e.g. broadcast fan-out collectors). + */ + public Map getNumRecordsOutPerTarget() { + return numRecordsOut.getCounters(); + } + public Counter getNumBuffersOutCounter() { return numBuffersOut; } @@ -342,6 +354,35 @@ public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) { this.numRecordsOut.addCounter(numRecordsOutCounter); } + /** + * Registers an output record counter whose emits are directed to the given downstream target + * vertex. The counter contributes both to the aggregate {@link MetricNames#IO_NUM_RECORDS_OUT} + * and to a per-target metric {@code numRecordsOut.}, enabling consumers + * (e.g. the Kubernetes autoscaler) to compute accurate per-edge data rates when a task has + * multiple downstream outputs (including side outputs). + */ + public void reuseRecordsOutputCounter(Counter numRecordsOutCounter, String jobVertexId) { + this.numRecordsOut.addCounter(numRecordsOutCounter, jobVertexId); + // Also expose it as an individual, discoverable metric so reporters (Prometheus, JMX, etc.) + // and the REST metric store can see the per-target breakdown. + counter(MetricNames.ioNumRecordsOutPerTargetName(jobVertexId), numRecordsOutCounter); + } + + /** + * Registers a per-downstream-target output record counter without adding it to the + * aggregate {@link MetricNames#IO_NUM_RECORDS_OUT} total. Used for the multi-output / broadcast + * fan-out path, where the aggregate is already incremented once per logical emit by the + * broadcast collector via {@link #reuseRecordsOutputCounter(Counter)}, and summing every + * per-target counter on top of that would double-count. + * + *

The counter is still exposed as the individual, discoverable metric {@code + * numRecordsOut.} and appears in {@link #getNumRecordsOutPerTarget()}. + */ + public void registerNumRecordsOutPerTarget(Counter numRecordsOutCounter, String jobVertexId) { + this.numRecordsOut.addTargetOnly(numRecordsOutCounter, jobVertexId); + counter(MetricNames.ioNumRecordsOutPerTargetName(jobVertexId), numRecordsOutCounter); + } + public void registerResultPartitionBytesCounter( IntermediateResultPartitionID resultPartitionId, ResultPartitionBytesCounter resultPartitionBytesCounter) { @@ -358,6 +399,7 @@ public void registerMailboxSizeSupplier(SizeSupplier supplier) { */ private static class SumCounter extends SimpleCounter { private final List internalCounters = new ArrayList<>(); + private final Map jobVertexIdToCounter = new HashMap<>(); SumCounter() {} @@ -365,6 +407,21 @@ public void addCounter(Counter toAdd) { internalCounters.add(toAdd); } + public void addCounter(Counter toAdd, String jobVertexId) { + internalCounters.add(toAdd); + jobVertexIdToCounter.put(jobVertexId, toAdd); + } + + /** + * Stores the counter for per-target lookup without contributing to the aggregate sum. + * Intended for the broadcast fan-out path where the aggregate is already incremented by a + * separate task-level counter and summing the per-target counters on top would + * double-count. + */ + public void addTargetOnly(Counter toAdd, String jobVertexId) { + jobVertexIdToCounter.put(jobVertexId, toAdd); + } + @Override public long getCount() { long sum = super.getCount(); @@ -373,6 +430,14 @@ public long getCount() { } return sum; } + + public Map getCounters() { + return jobVertexIdToCounter.entrySet().stream() + .collect( + HashMap::new, + (m, e) -> m.put(e.getKey(), e.getValue().getCount()), + HashMap::putAll); + } } private static class SizeGauge implements Gauge { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 7d55c262398c7..7e7285c11b97f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -246,7 +246,8 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( counts.isNumRecordsOutComplete(), counts.getAccumulateBackPressuredTime(), counts.getAccumulateIdleTime(), - counts.getAccumulateBusyTime()); + counts.getAccumulateBusyTime(), + counts.getNumRecordsOutPerTargetMutable()); return new JobDetailsInfo.JobVertexDetailsInfo( ejv.getJobVertexId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 4b0ec1bf9fb69..eaa70bbbe7cbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -225,7 +225,8 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo( current.isNumRecordsOutComplete(), current.getAccumulateBackPressuredTime(), current.getAccumulateIdleTime(), - current.getAccumulateBusyTime())); + current.getAccumulateBusyTime(), + current.getNumRecordsOutPerTargetMutable())); } long duration; @@ -259,7 +260,8 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo( counts.isNumRecordsOutComplete(), counts.getAccumulateBackPressuredTime(), counts.getAccumulateIdleTime(), - counts.getAccumulateBusyTime()); + counts.getAccumulateBusyTime(), + counts.getNumRecordsOutPerTargetMutable()); Map statusCounts = CollectionUtil.newHashMapWithExpectedSize(ExecutionState.values().length); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 5305b98c7ba84..aaace2fa70052 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -28,6 +28,10 @@ import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related * metrics. @@ -48,6 +52,7 @@ public class MutableIOMetrics extends IOMetrics { public MutableIOMetrics() { super(0, 0, 0, 0, 0, 0, 0); + this.numRecordsOutPerTarget = new HashMap<>(); } public boolean isNumBytesInComplete() { @@ -66,6 +71,10 @@ public boolean isNumRecordsOutComplete() { return numRecordsOutComplete; } + public Map getNumRecordsOutPerTargetMutable() { + return numRecordsOutPerTarget == null ? Collections.emptyMap() : numRecordsOutPerTarget; + } + /** * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is * in a terminal state the contained {@link IOMetrics} object is added. Otherwise the given @@ -96,6 +105,11 @@ public void addIOMetrics( } else { this.accumulateBusyTime += ioMetrics.getAccumulateBusyTime(); } + // Aggregate the per-downstream-target breakdown across subtasks. + for (Map.Entry entry : + ioMetrics.getNumRecordsOutPerTarget().entrySet()) { + this.numRecordsOutPerTarget.merge(entry.getKey(), entry.getValue(), Long::sum); + } } } else { // execAttempt is still running, use MetricQueryService instead if (jobMetrics != null) { @@ -139,6 +153,26 @@ public void addIOMetrics( Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); } + // Aggregate the per-downstream-target breakdown. The metric store carries one + // entry per target, named "numRecordsOut.". + for (Map.Entry metricEntry : metrics.metrics.entrySet()) { + String name = metricEntry.getKey(); + if (name.startsWith(MetricNames.IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX)) { + String targetId = + name.substring( + MetricNames.IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX + .length()); + try { + this.numRecordsOutPerTarget.merge( + targetId, + Long.parseLong(metricEntry.getValue()), + Long::sum); + } catch (NumberFormatException ignored) { + // Malformed value — skip this subtask for this target. + } + } + } + if (metrics.getMetric(MetricNames.ACC_TASK_BACK_PRESSURED_TIME) != null) { this.accumulateBackPressuredTime += Long.parseLong( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java index d891e8d84434f..c04caf4c9ef3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java @@ -229,7 +229,8 @@ public static SubtaskExecutionAttemptDetailsInfo create( ioMetrics.isNumRecordsOutComplete(), ioMetrics.getAccumulateBackPressuredTime(), ioMetrics.getAccumulateIdleTime(), - ioMetrics.getAccumulateBusyTime()); + ioMetrics.getAccumulateBusyTime(), + ioMetrics.getNumRecordsOutPerTargetMutable()); return new SubtaskExecutionAttemptDetailsInfo( execution.getParallelSubtaskIndex(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java index 35bad57eea4a3..73178b9326bc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java @@ -19,8 +19,13 @@ package org.apache.flink.runtime.rest.messages.job.metrics; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; import java.util.Objects; /** IO metrics information. */ @@ -48,6 +53,14 @@ public final class IOMetricsInfo { public static final String FIELD_NAME_ACC_BUSY = "accumulated-busy-time"; + /** + * Per-downstream-target breakdown of {@link #FIELD_NAME_RECORDS_WRITTEN}, keyed by target + * {@code JobVertexID} (hex string). Added in the scope of enabling the Kubernetes autoscaler to + * compute accurate per-edge data rates for vertices with multiple downstream outputs. Missing + * or empty for older Flink versions. + */ + public static final String FIELD_NAME_RECORDS_WRITTEN_PER_TARGET = "write-records-per-target"; + @JsonProperty(FIELD_NAME_BYTES_READ) private final long bytesRead; @@ -81,6 +94,40 @@ public final class IOMetricsInfo { @JsonProperty(FIELD_NAME_ACC_BUSY) private final double accumulatedBusy; + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_PER_TARGET) + private final Map recordsWrittenPerTarget; + + /** + * Back-compatible constructor that defaults {@link #recordsWrittenPerTarget} to an empty map. + * Kept to avoid churn at the dozen existing call sites. + */ + public IOMetricsInfo( + long bytesRead, + boolean bytesReadComplete, + long bytesWritten, + boolean bytesWrittenComplete, + long recordsRead, + boolean recordsReadComplete, + long recordsWritten, + boolean recordsWrittenComplete, + long accumulatedBackpressured, + long accumulatedIdle, + double accumulatedBusy) { + this( + bytesRead, + bytesReadComplete, + bytesWritten, + bytesWrittenComplete, + recordsRead, + recordsReadComplete, + recordsWritten, + recordsWrittenComplete, + accumulatedBackpressured, + accumulatedIdle, + accumulatedBusy, + null); + } + @JsonCreator public IOMetricsInfo( @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead, @@ -93,7 +140,9 @@ public IOMetricsInfo( @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete, @JsonProperty(FIELD_NAME_ACC_BACK_PRESSURE) long accumulatedBackpressured, @JsonProperty(FIELD_NAME_ACC_IDLE) long accumulatedIdle, - @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy) { + @JsonProperty(FIELD_NAME_ACC_BUSY) double accumulatedBusy, + @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_PER_TARGET) @Nullable + Map recordsWrittenPerTarget) { this.bytesRead = bytesRead; this.bytesReadComplete = bytesReadComplete; this.bytesWritten = bytesWritten; @@ -105,6 +154,10 @@ public IOMetricsInfo( this.accumulatedBackpressured = accumulatedBackpressured; this.accumulatedIdle = accumulatedIdle; this.accumulatedBusy = accumulatedBusy; + this.recordsWrittenPerTarget = + recordsWrittenPerTarget == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(recordsWrittenPerTarget); } public long getBytesRead() { @@ -151,6 +204,11 @@ public long getAccumulatedIdle() { return accumulatedIdle; } + @JsonIgnore + public Map getRecordsWrittenPerTarget() { + return recordsWrittenPerTarget; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -170,7 +228,8 @@ public boolean equals(Object o) { && recordsWrittenComplete == that.recordsWrittenComplete && accumulatedBackpressured == that.accumulatedBackpressured && accumulatedBusy == that.accumulatedBusy - && accumulatedIdle == that.accumulatedIdle; + && accumulatedIdle == that.accumulatedIdle + && Objects.equals(recordsWrittenPerTarget, that.recordsWrittenPerTarget); } @Override @@ -186,6 +245,7 @@ public int hashCode() { recordsWrittenComplete, accumulatedBackpressured, accumulatedBusy, - accumulatedIdle); + accumulatedIdle, + recordsWrittenPerTarget); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index b6dc6a95e461a..5b83803a69616 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -133,6 +133,24 @@ public class AdaptiveGraphManager private final Map streamNodeIdsToJobVertexMap; + /** + * Caches the {@link StreamConfig} of each already-created upstream vertex so that, when a + * downstream vertex is finally created in a later iteration, we can patch the now-known + * downstream {@link JobVertexID} back into the upstream's {@link NonChainedOutput}s and + * re-serialize them. Keyed by the upstream vertex's start-node id. + */ + private final Map startNodeIdToVertexConfigMap = new HashMap<>(); + + /** + * Caches the operator-level {@link StreamConfig} for every stream node whose config has been + * built so far, keyed by the upstream stream-node id that produces a non-chained output. This + * is the config whose {@link StreamConfig#OP_NONCHAINED_OUTPUTS OP_NONCHAINED_OUTPUTS} list is + * read at runtime by {@code OperatorChain}; it must be re-serialized together with the + * vertex-level list when a {@link NonChainedOutput}'s target id is patched in a later adaptive + * iteration. + */ + private final Map streamNodeIdToOperatorConfigMap = new HashMap<>(); + // Records the ID of the job vertex that has completed execution. private final Set finishedJobVertices; @@ -330,6 +348,18 @@ private void generateConfigForJobVertices(JobVertexBuildContext jobVertexBuildCo setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext); + // Cache the operator-level StreamConfig of every stream node built in this iteration so + // that a later iteration can re-serialize OP_NONCHAINED_OUTPUTS after patching in the + // now-known downstream JobVertexID. + for (OperatorChainInfo chainInfo : jobVertexBuildContext.getChainInfosInOrder().values()) { + chainInfo + .getOperatorInfos() + .forEach( + (streamNodeId, operatorInfo) -> + streamNodeIdToOperatorConfigMap.put( + streamNodeId, operatorInfo.getVertexConfig())); + } + setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs, jobVertexBuildContext); connectToFinishedUpStreamVertex(jobVertexBuildContext); @@ -420,6 +450,11 @@ private void setVertexNonChainedOutputsConfig( intermediateDataSetIdToProducerMap.put(output.getDataSetId(), edge.getSourceId()); } config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs)); + // Remember this upstream's StreamConfig so that later, when a downstream vertex + // referenced by one of the above outputs is finally created, we can patch the + // now-known downstream JobVertexID back into the (still cached) NonChainedOutput + // and re-serialize this config's VERTEX_NONCHAINED_OUTPUTS key. + startNodeIdToVertexConfigMap.put(startNodeId, config); } /** @@ -429,7 +464,18 @@ private void setVertexNonChainedOutputsConfig( */ private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuildContext) { Map chainInfos = jobVertexBuildContext.getChainInfosInOrder(); + // Track upstream start-node ids whose cached NonChainedOutputs were mutated so that we + // re-serialize the upstream's VERTEX_NONCHAINED_OUTPUTS only once per upstream vertex. + final Set upstreamsToReserialize = new HashSet<>(); + // Track upstream operator stream-node ids (producers of patched outputs) so we can + // re-serialize their OP_NONCHAINED_OUTPUTS once. Runtime OperatorChain reads from this + // operator-level list, so refreshing only the vertex-level list is not sufficient. + final Set upstreamOperatorsToReserialize = new HashSet<>(); for (OperatorChainInfo chainInfo : chainInfos.values()) { + JobVertex downstreamVertex = + jobVertexBuildContext.getJobVertex(chainInfo.getStartNodeId()); + JobVertexID downstreamVertexId = + downstreamVertex == null ? null : downstreamVertex.getID(); List transitiveInEdges = chainInfo.getTransitiveInEdges(); for (StreamEdge transitiveInEdge : transitiveInEdges) { NonChainedOutput output = @@ -437,6 +483,15 @@ private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuil .get(transitiveInEdge.getSourceId()) .get(transitiveInEdge); Integer sourceStartNodeId = getStartNodeId(transitiveInEdge.getSourceId()); + // Patch the now-known downstream JobVertexID into the cached NonChainedOutput + // that was created in an earlier iteration when the downstream vertex did not + // yet exist (its target id was left null). Record the upstream vertex and the + // producing operator so we can re-serialize both persisted copies once below. + if (downstreamVertexId != null && output.getTargetNodeId() == null) { + output.setTargetVertexId(downstreamVertexId); + upstreamsToReserialize.add(sourceStartNodeId); + upstreamOperatorsToReserialize.add(transitiveInEdge.getSourceId()); + } connect( sourceStartNodeId, transitiveInEdge, @@ -445,6 +500,23 @@ private void connectToFinishedUpStreamVertex(JobVertexBuildContext jobVertexBuil jobVertexBuildContext); } } + // Refresh the persisted bytes of the upstream vertex configs whose cached outputs were + // mutated above, so that VERTEX_NONCHAINED_OUTPUTS reflects the patched target ids. + for (Integer upstreamStartNodeId : upstreamsToReserialize) { + StreamConfig upstreamConfig = startNodeIdToVertexConfigMap.get(upstreamStartNodeId); + if (upstreamConfig != null) { + upstreamConfig.reserializeVertexNonChainedOutputs(); + } + } + // Same for OP_NONCHAINED_OUTPUTS on the producing operator's config, which is what + // OperatorChain actually reads at runtime when wiring per-target numRecordsOut counters. + for (Integer upstreamOperatorStreamNodeId : upstreamOperatorsToReserialize) { + StreamConfig upstreamOperatorConfig = + streamNodeIdToOperatorConfigMap.get(upstreamOperatorStreamNodeId); + if (upstreamOperatorConfig != null) { + upstreamOperatorConfig.reserializeOperatorNonChainedOutputs(); + } + } } private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildContext) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java index f1d08d4b7df08..99e4204432362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java @@ -20,9 +20,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; @@ -40,6 +43,19 @@ public class NonChainedOutput implements Serializable { /** ID of the producer {@link StreamNode}. */ private final int sourceNodeId; + /** + * ID of the consumer {@link org.apache.flink.runtime.jobgraph.JobVertex}. + * + *

In eager job-graph construction this is always set at construction time. In adaptive + * job-graph construction modes (e.g. {@code AdaptiveGraphManager}) the downstream {@link + * org.apache.flink.runtime.jobgraph.JobVertex} may not yet exist when this output is first + * created; in that case the field is transiently {@code null} and is patched in via {@link + * #setTargetVertexId(JobVertexID)} as soon as the downstream vertex is created, before the + * upstream vertex is scheduled. Runtime code (e.g. {@code OperatorChain}) must therefore always + * observe a non-null value. + */ + @Nullable private JobVertexID targetVertexId; + /** Parallelism of the consumer vertex. */ private final int consumerParallelism; @@ -67,6 +83,7 @@ public class NonChainedOutput implements Serializable { public NonChainedOutput( boolean supportsUnalignedCheckpoints, int sourceNodeId, + JobVertexID targetVertexId, int consumerParallelism, int consumerMaxParallelism, long bufferTimeout, @@ -77,6 +94,7 @@ public NonChainedOutput( ResultPartitionType partitionType) { this.supportsUnalignedCheckpoints = supportsUnalignedCheckpoints; this.sourceNodeId = sourceNodeId; + this.targetVertexId = targetVertexId; this.consumerParallelism = consumerParallelism; this.consumerMaxParallelism = consumerMaxParallelism; this.bufferTimeout = bufferTimeout; @@ -95,6 +113,21 @@ public int getSourceNodeId() { return sourceNodeId; } + public JobVertexID getTargetNodeId() { + return targetVertexId; + } + + /** + * Patches in the downstream {@link JobVertexID} after this output has already been created. + * Used exclusively by adaptive job-graph construction to resolve the transient null state of + * {@link #targetVertexId} once the downstream {@link + * org.apache.flink.runtime.jobgraph.JobVertex} becomes known. Must be invoked before the owning + * upstream vertex is scheduled. + */ + public void setTargetVertexId(@Nullable JobVertexID targetVertexId) { + this.targetVertexId = targetVertexId; + } + public int getConsumerParallelism() { return consumerParallelism; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 96295c38486ab..7291b057ace5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -536,6 +536,50 @@ public void setVertexNonChainedOutputs(List nonChainedOutputs) toBeSerializedConfigObjects.put(VERTEX_NONCHAINED_OUTPUTS, nonChainedOutputs); } + /** + * Re-serialize the {@link #VERTEX_NONCHAINED_OUTPUTS} key from the in-memory Java list + * previously handed to {@link #setVertexNonChainedOutputs(List)}. Used by adaptive job-graph + * construction to refresh the persisted bytes after mutating in-place fields on the cached + * {@link NonChainedOutput} instances (e.g. stamping a downstream {@link + * org.apache.flink.runtime.jobgraph.JobVertexID} that was not known at the time of the first + * serialization). + * + *

No-op if this config has no vertex non-chained outputs tracked. + */ + public void reserializeVertexNonChainedOutputs() { + Object value = toBeSerializedConfigObjects.get(VERTEX_NONCHAINED_OUTPUTS); + if (value == null) { + return; + } + try { + InstantiationUtil.writeObjectToConfig(value, this.config, VERTEX_NONCHAINED_OUTPUTS); + } catch (IOException e) { + throw new StreamTaskException("Could not re-serialize vertex non-chained outputs.", e); + } + } + + /** + * Re-serialize the {@link #OP_NONCHAINED_OUTPUTS} key from the in-memory Java list previously + * handed to {@link #setOperatorNonChainedOutputs(List)}. Symmetric to {@link + * #reserializeVertexNonChainedOutputs()}; required because runtime {@code OperatorChain} reads + * the operator-level (not vertex-level) list, so patching a {@link NonChainedOutput} field in + * adaptive mode must refresh both persisted copies. + * + *

No-op if this config has no operator non-chained outputs tracked. + */ + public void reserializeOperatorNonChainedOutputs() { + Object value = toBeSerializedConfigObjects.get(OP_NONCHAINED_OUTPUTS); + if (value == null) { + return; + } + try { + InstantiationUtil.writeObjectToConfig(value, this.config, OP_NONCHAINED_OUTPUTS); + } catch (IOException e) { + throw new StreamTaskException( + "Could not re-serialize operator non-chained outputs.", e); + } + } + public List getVertexNonChainedOutputs(ClassLoader cl) { try { List nonChainedOutputs = diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 66776c03fb5fd..8054b33bb50d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1472,7 +1472,7 @@ private static List mayReuseNonChainedOutputs( isPersistentDataSet, dataSetId, partitionType, - jobVertexBuildContext.getStreamGraph()); + jobVertexBuildContext); } return outputs; } @@ -1484,7 +1484,8 @@ private static void createOrReuseOutput( boolean isPersistentDataSet, IntermediateDataSetID dataSetId, ResultPartitionType partitionType, - StreamGraph streamGraph) { + JobVertexBuildContext jobVertexBuildContext) { + var streamGraph = jobVertexBuildContext.getStreamGraph(); int consumerParallelism = streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism(); int consumerMaxParallelism = @@ -1516,10 +1517,22 @@ private static void createOrReuseOutput( } } if (reusableOutput == null) { + // In adaptive / incremental job-graph construction modes (e.g. AdaptiveGraphManager) + // the downstream JobVertex may not yet be registered in the build context when this + // output is created; it will be added in a later iteration. We tolerate the missing + // lookup here and leave targetVertexId transiently null: AdaptiveGraphManager will + // patch it via NonChainedOutput#setTargetVertexId (and re-serialize the upstream + // StreamConfig) once the downstream vertex is created, before the upstream vertex is + // scheduled. In eager construction the lookup is always non-null. + JobVertex consumerJobVertex = + jobVertexBuildContext.getJobVertex(consumerEdge.getTargetId()); + JobVertexID targetVertexId = + consumerJobVertex == null ? null : consumerJobVertex.getID(); NonChainedOutput output = new NonChainedOutput( consumerEdge.supportsUnalignedCheckpoints(), consumerEdge.getSourceId(), + targetVertexId, consumerParallelism, consumerMaxParallelism, consumerEdge.getBufferTimeout(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 815fa13bc3963..2bc7bd5c6e554 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -99,16 +99,12 @@ public RecordWriterOutput( @Override public void collect(StreamRecord record) { - if (collectAndCheckIfChained(record)) { - numRecordsOut.inc(); - } + collectAndCheckIfChained(record); } @Override public void collect(OutputTag outputTag, StreamRecord record) { - if (collectAndCheckIfChained(outputTag, record)) { - numRecordsOut.inc(); - } + collectAndCheckIfChained(outputTag, record); } @Override @@ -119,6 +115,7 @@ public boolean collectAndCheckIfChained(StreamRecord record) { } pushToRecordWriter(record); + numRecordsOut.inc(); return true; } @@ -131,6 +128,7 @@ public boolean collectAndCheckIfChained(OutputTag outputTag, StreamRecord } pushToRecordWriter(record); + numRecordsOut.inc(); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 78d202c62c7ae..c39241b2fdb41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; @@ -84,6 +85,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -714,6 +716,8 @@ private WatermarkGaugeExposingOutput> createOutputCollector( MailboxExecutorFactory mailboxExecutorFactory, boolean shouldAddMetric) { List>> allOutputs = new ArrayList<>(4); + Map>, JobVertexID> outputToTargetNodeId = + new IdentityHashMap<>(); // create collectors for the network outputs for (NonChainedOutput streamOutput : @@ -723,6 +727,7 @@ private WatermarkGaugeExposingOutput> createOutputCollector( (RecordWriterOutput) recordWriterOutputs.get(streamOutput.getDataSetId()); allOutputs.add(recordWriterOutput); + outputToTargetNodeId.put(recordWriterOutput, streamOutput.getTargetNodeId()); } // Create collectors for the chained outputs @@ -744,6 +749,9 @@ private WatermarkGaugeExposingOutput> createOutputCollector( shouldAddMetric); checkState(output instanceof OutputWithChainingCheck); allOutputs.add((OutputWithChainingCheck) output); + outputToTargetNodeId.put( + (OutputWithChainingCheck) output, + containingTask.getEnvironment().getJobVertexId()); // If the operator has multiple downstream chained operators, only one of them should // increment the recordsOutCounter for this operator. Set shouldAddMetric to false // so that we would skip adding the counter to other downstream operators. @@ -756,7 +764,22 @@ private WatermarkGaugeExposingOutput> createOutputCollector( result = allOutputs.get(0); // only if this is a single RecordWriterOutput, reuse its numRecordOut for task. if (result instanceof RecordWriterOutput) { - Counter numRecordsOutCounter = createNumRecordsOutCounter(containingTask); + JobVertexID targetNodeId = outputToTargetNodeId.get(result); + // Fall back to the non-target counter variant if the downstream target id is + // not available for any reason. Missing per-target metadata must never fail + // the task wiring — the aggregate numRecordsOut still works, but the per-target + // breakdown (numRecordsOut.) will not be exposed for this output. + final Counter numRecordsOutCounter; + if (targetNodeId == null) { + LOG.warn( + "Missing downstream JobVertexID for the single network output of " + + "task {}; registering an aggregate-only numRecordsOut " + + "counter (no per-target breakdown for this output).", + containingTask.getName()); + numRecordsOutCounter = createNumRecordsOutCounter(containingTask); + } else { + numRecordsOutCounter = createNumRecordsOutCounter(containingTask, targetNodeId); + } ((RecordWriterOutput) result).setNumRecordsOut(numRecordsOutCounter); } } else { @@ -766,7 +789,30 @@ private WatermarkGaugeExposingOutput> createOutputCollector( OutputWithChainingCheck>[] allOutputsArray = new OutputWithChainingCheck[allOutputs.size()]; for (int i = 0; i < allOutputs.size(); i++) { - allOutputsArray[i] = allOutputs.get(i); + OutputWithChainingCheck> output = allOutputs.get(i); + // For each network output that has a concrete downstream JobVertexID, install a + // per-target counter so that broadcast fan-out emits are tracked per edge. The + // aggregate task-level numRecordsOut is still driven once per logical emit by the + // BroadcastingOutputCollector below, so per-target counters are registered in a + // "target-only" mode that does not contribute to the aggregate sum (which would + // otherwise double-count). + if (output instanceof RecordWriterOutput) { + JobVertexID targetNodeId = outputToTargetNodeId.get(output); + if (targetNodeId != null) { + Counter perTargetCounter = + createPerTargetRecordsOutCounter(containingTask, targetNodeId); + ((RecordWriterOutput) output).setNumRecordsOut(perTargetCounter); + } else { + LOG.warn( + "Missing downstream JobVertexID for one of the network outputs " + + "of task {} in the multi-output / broadcast fan-out " + + "path; skipping per-target numRecordsOut registration " + + "for this output. The aggregate task-level " + + "numRecordsOut is unaffected.", + containingTask.getName()); + } + } + allOutputsArray[i] = output; } // This is the inverse of creating the normal ChainingOutput. @@ -799,6 +845,13 @@ private WatermarkGaugeExposingOutput> createOutputCollector( } private static Counter createNumRecordsOutCounter(StreamTask containingTask) { + // Registers the task-level numRecordsOut counter without a downstream-target key, so it + // contributes only to the aggregate numRecordsOut total and is not exposed as a per-target + // metric. Used in two cases: + // 1. The multi-output / broadcast fan-out path, where a single task-level counter + // aggregates emits from the BroadcastingOutputCollector. + // 2. As the fallback for any single- or multi-output branch when the downstream + // target JobVertexID is unavailable (the caller is expected to log the reason). TaskIOMetricGroup taskIOMetricGroup = containingTask.getEnvironment().getMetricGroup().getIOMetricGroup(); Counter counter = new SimpleCounter(); @@ -806,6 +859,32 @@ private static Counter createNumRecordsOutCounter(StreamTask containingTas return counter; } + private static Counter createNumRecordsOutCounter( + StreamTask containingTask, JobVertexID targetVertexId) { + TaskIOMetricGroup taskIOMetricGroup = + containingTask.getEnvironment().getMetricGroup().getIOMetricGroup(); + Counter counter = new SimpleCounter(); + taskIOMetricGroup.reuseRecordsOutputCounter(counter, targetVertexId.toHexString()); + return counter; + } + + /** + * Creates a per-downstream-target {@code numRecordsOut} counter for the multi-output / + * broadcast fan-out path. The counter is exposed as the individual metric {@code + * numRecordsOut.} and appears in {@link + * TaskIOMetricGroup#getNumRecordsOutPerTarget()}, but is not added to the aggregate + * {@link MetricNames#IO_NUM_RECORDS_OUT} sum, because in this path the aggregate is already + * incremented once per logical emit by the broadcast collector. + */ + private static Counter createPerTargetRecordsOutCounter( + StreamTask containingTask, JobVertexID targetVertexId) { + TaskIOMetricGroup taskIOMetricGroup = + containingTask.getEnvironment().getMetricGroup().getIOMetricGroup(); + Counter counter = new SimpleCounter(); + taskIOMetricGroup.registerNumRecordsOutPerTarget(counter, targetVertexId.toHexString()); + return counter; + } + /** * Recursively create chain of operators that starts from the given {@param operatorConfig}. * Operators are created tail to head and wrapped into an {@link WatermarkGaugeExposingOutput}. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java index 2c68de130a97c..06b6d212328f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -196,4 +196,103 @@ void testResultPartitionBytesMetrics() { assertThat(resultPartitionBytes.get(resultPartitionID2).getSubpartitionBytes()) .containsExactly(128L, 128L); } + + /** + * Verifies that per-downstream-target record counts registered via the two-arg overload of + * {@link TaskIOMetricGroup#reuseRecordsOutputCounter(Counter, String)} are: + * + *

    + *
  • available through {@link TaskIOMetricGroup#getNumRecordsOutPerTarget()}, + *
  • summed into the aggregate {@link TaskIOMetricGroup#getNumRecordsOutCounter()}, + *
  • and carried through the {@link IOMetrics} snapshot. + *
+ */ + @Test + void testNumRecordsOutPerTarget() { + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + + Counter toVertexA = new SimpleCounter(); + toVertexA.inc(10L); + Counter toVertexB = new SimpleCounter(); + toVertexB.inc(25L); + + // Broadcast-collector contribution with no associated target vertex. + Counter broadcast = new SimpleCounter(); + broadcast.inc(7L); + + taskIO.reuseRecordsOutputCounter(toVertexA, "vertex-a"); + taskIO.reuseRecordsOutputCounter(toVertexB, "vertex-b"); + taskIO.reuseRecordsOutputCounter(broadcast); // single-arg overload: no per-target key + + // Aggregate includes everything, including the untargeted broadcast counter. + assertThat(taskIO.getNumRecordsOutCounter().getCount()).isEqualTo(42L); + + // Per-target map contains only the keyed entries. + Map perTarget = taskIO.getNumRecordsOutPerTarget(); + assertThat(perTarget) + .containsExactlyInAnyOrderEntriesOf(Map.of("vertex-a", 10L, "vertex-b", 25L)); + + // Snapshot carries the same per-target map. + IOMetrics snapshot = taskIO.createSnapshot(); + assertThat(snapshot.getNumRecordsOutPerTarget()) + .containsExactlyInAnyOrderEntriesOf(Map.of("vertex-a", 10L, "vertex-b", 25L)); + + // Mutations to the registered counters after snapshot are NOT reflected in it — but the + // TaskIOMetricGroup's live view IS updated. + toVertexA.inc(5L); + assertThat(taskIO.getNumRecordsOutPerTarget()).containsEntry("vertex-a", 15L); + } + + /** + * Verifies that the per-target counter is also registered as an individually named metric (e.g. + * {@code numRecordsOut.}), so that reporters and the REST metric store can + * discover it alongside the aggregate {@code numRecordsOut}. + */ + @Test + void testNumRecordsOutPerTargetIsRegisteredAsIndividualMetric() { + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + + Counter target = new SimpleCounter(); + target.inc(3L); + + // Should not throw even though the name contains a '.' separator — this is also the + // implicit contract test for the metric-name builder. + taskIO.reuseRecordsOutputCounter(target, "abc123"); + + assertThat(taskIO.getNumRecordsOutPerTarget()).containsEntry("abc123", 3L); + } + + /** + * Verifies the per-target-only registration used by the multi-output / broadcast fan-out path: + * per-target counters are exposed through {@link TaskIOMetricGroup#getNumRecordsOutPerTarget()} + * and as individual metrics, but do not contribute to the aggregate {@link + * TaskIOMetricGroup#getNumRecordsOutCounter()}. This prevents double-counting when the + * aggregate is already incremented once per logical emit by the broadcast collector. + */ + @Test + void testRegisterNumRecordsOutPerTargetDoesNotContributeToAggregate() { + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + + // Simulates the broadcast collector's task-level aggregate counter: one logical emit. + Counter broadcastAggregate = new SimpleCounter(); + broadcastAggregate.inc(1L); + taskIO.reuseRecordsOutputCounter(broadcastAggregate); + + // Per-target counters driven by each underlying RecordWriterOutput (target-only mode). + Counter toVertexA = new SimpleCounter(); + toVertexA.inc(1L); + Counter toVertexB = new SimpleCounter(); + toVertexB.inc(1L); + taskIO.registerNumRecordsOutPerTarget(toVertexA, "vertex-a"); + taskIO.registerNumRecordsOutPerTarget(toVertexB, "vertex-b"); + + // Aggregate equals the broadcast counter only — per-target counters are NOT summed in. + assertThat(taskIO.getNumRecordsOutCounter().getCount()).isEqualTo(1L); + // But per-target visibility is preserved for downstream consumers. + assertThat(taskIO.getNumRecordsOutPerTarget()) + .containsExactlyInAnyOrderEntriesOf(Map.of("vertex-a", 1L, "vertex-b", 1L)); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java index 5ca09de4ba9c9..e8360020108fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java @@ -22,8 +22,18 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionHistory; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequestException; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; @@ -38,6 +48,7 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -54,6 +65,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; /** Test for the {@link JobDetailsHandler}. */ @@ -120,4 +132,136 @@ void testGetJobDetailsWithStreamGraphJson() throws RestHandlerException { assertThat(jobDetailsInfo.getStreamGraphJson()) .isEqualTo(new JobPlanInfo.RawJson(expectedStreamGraphJson).toString()); } + + /** + * Verifies that {@link JobDetailsHandler} aggregates the per-downstream-target {@code + * numRecordsOut} map across subtasks of the same job vertex. Uses the terminal (archived) path, + * which consumes {@link IOMetrics#getNumRecordsOutPerTarget()} directly without the {@link + * MetricFetcher}. + */ + @Test + void testJobDetailsAggregatesPerTargetWriteRecords() throws Exception { + final String targetA = "abcdef0123456789abcdef0123456789"; + final String targetB = "0123456789abcdef0123456789abcdef"; + + final Map subtask0PerTarget = new HashMap<>(); + subtask0PerTarget.put(targetA, 3L); + subtask0PerTarget.put(targetB, 1L); + final IOMetrics ioMetrics0 = + new IOMetrics(0L, 0L, 0L, 4L, 0L, 0.0, 0L, null, subtask0PerTarget); + + final Map subtask1PerTarget = new HashMap<>(); + subtask1PerTarget.put(targetA, 2L); + subtask1PerTarget.put(targetB, 4L); + final IOMetrics ioMetrics1 = + new IOMetrics(0L, 0L, 0L, 6L, 0L, 0.0, 0L, null, subtask1PerTarget); + + final JobVertexID jobVertexId = new JobVertexID(); + final ArchivedExecutionJobVertex archivedJobVertex = + buildJobVertexWithSubtaskIOMetrics(jobVertexId, ioMetrics0, ioMetrics1); + + final AccessExecutionGraph graphWithVertex = + buildGraphWithSingleVertex(archivedJobVertex, jobVertexId); + + final HandlerRequest request = createRequest(graphWithVertex.getJobID()); + final JobDetailsInfo jobDetailsInfo = + jobDetailsHandler.handleRequest( + request, new ExecutionGraphInfo((ArchivedExecutionGraph) graphWithVertex)); + + final JobDetailsInfo.JobVertexDetailsInfo vertexInfo = + jobDetailsInfo.getJobVertexInfos().iterator().next(); + final IOMetricsInfo metrics = vertexInfo.getJobVertexMetrics(); + + assertThat(metrics.getRecordsWritten()).isEqualTo(10L); + assertThat(metrics.getRecordsWrittenPerTarget()) + .containsEntry(targetA, 5L) + .containsEntry(targetB, 5L) + .hasSize(2); + } + + /** + * Verifies that a job vertex whose subtasks report no per-target breakdown (legacy {@link + * IOMetrics} constructors) surfaces an empty {@code write-records-per-target} map rather than + * {@code null}. + */ + @Test + void testJobDetailsBackCompatNoPerTarget() throws Exception { + final IOMetrics legacy0 = new IOMetrics(0L, 0L, 0L, 5L, 0L, 0.0, 0L); + final IOMetrics legacy1 = new IOMetrics(0L, 0L, 0L, 7L, 0L, 0.0, 0L); + + final JobVertexID jobVertexId = new JobVertexID(); + final ArchivedExecutionJobVertex archivedJobVertex = + buildJobVertexWithSubtaskIOMetrics(jobVertexId, legacy0, legacy1); + + final AccessExecutionGraph graphWithVertex = + buildGraphWithSingleVertex(archivedJobVertex, jobVertexId); + + final HandlerRequest request = createRequest(graphWithVertex.getJobID()); + final JobDetailsInfo jobDetailsInfo = + jobDetailsHandler.handleRequest( + request, new ExecutionGraphInfo((ArchivedExecutionGraph) graphWithVertex)); + + final IOMetricsInfo metrics = + jobDetailsInfo.getJobVertexInfos().iterator().next().getJobVertexMetrics(); + + assertThat(metrics.getRecordsWritten()).isEqualTo(12L); + assertThat(metrics.getRecordsWrittenPerTarget()).isNotNull().isEmpty(); + } + + private static ArchivedExecutionJobVertex buildJobVertexWithSubtaskIOMetrics( + JobVertexID jobVertexId, IOMetrics ioMetrics0, IOMetrics ioMetrics1) { + final StringifiedAccumulatorResult[] emptyAccumulators = + new StringifiedAccumulatorResult[0]; + final ArchivedExecutionVertex subtask0 = + new ArchivedExecutionVertex( + 0, + "subtask-0", + new ArchivedExecution( + emptyAccumulators, + ioMetrics0, + createExecutionAttemptId(jobVertexId, 0, 0), + ExecutionState.FINISHED, + null, + null, + null, + new long[ExecutionState.values().length], + new long[ExecutionState.values().length]), + new ExecutionHistory(0)); + final ArchivedExecutionVertex subtask1 = + new ArchivedExecutionVertex( + 1, + "subtask-1", + new ArchivedExecution( + emptyAccumulators, + ioMetrics1, + createExecutionAttemptId(jobVertexId, 1, 0), + ExecutionState.FINISHED, + null, + null, + null, + new long[ExecutionState.values().length], + new long[ExecutionState.values().length]), + new ExecutionHistory(0)); + return new ArchivedExecutionJobVertex( + new ArchivedExecutionVertex[] {subtask0, subtask1}, + jobVertexId, + "test-vertex", + 2, + 2, + new SlotSharingGroup(), + ResourceProfile.UNKNOWN, + emptyAccumulators); + } + + private AccessExecutionGraph buildGraphWithSingleVertex( + ArchivedExecutionJobVertex archivedJobVertex, JobVertexID jobVertexId) { + final Map tasks = new HashMap<>(); + tasks.put(jobVertexId, archivedJobVertex); + final ArchivedExecutionConfig archivedExecutionConfig = + new ArchivedExecutionConfigBuilder().build(); + return new ArchivedExecutionGraphBuilder() + .setArchivedExecutionConfig(archivedExecutionConfig) + .setTasks(tasks) + .build(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java index 36a92b54cb545..0fa5205ea16db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java @@ -21,14 +21,20 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.util.HashMap; import java.util.Map; import java.util.Random; +import static org.assertj.core.api.Assertions.assertThat; + /** Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. */ @ExtendWith(NoOpTestExtension.class) class SubtaskExecutionAttemptDetailsInfoTest @@ -43,6 +49,12 @@ protected Class getTestResponseClass() { protected SubtaskExecutionAttemptDetailsInfo getTestResponseInstance() throws Exception { final Random random = new Random(); + final Map recordsWrittenPerTarget = new HashMap<>(); + recordsWrittenPerTarget.put( + "abcdef0123456789abcdef0123456789", Math.abs(random.nextLong())); + recordsWrittenPerTarget.put( + "0123456789abcdef0123456789abcdef", Math.abs(random.nextLong())); + final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo( Math.abs(random.nextLong()), @@ -55,7 +67,8 @@ protected SubtaskExecutionAttemptDetailsInfo getTestResponseInstance() throws Ex random.nextBoolean(), Math.abs(random.nextLong()), Math.abs(random.nextLong()), - Math.abs(random.nextDouble())); + Math.abs(random.nextDouble()), + recordsWrittenPerTarget); final Map statusDuration = new HashMap<>(); statusDuration.put(ExecutionState.CREATED, 10L); @@ -77,4 +90,48 @@ protected SubtaskExecutionAttemptDetailsInfo getTestResponseInstance() throws Ex statusDuration, null); } + + /** + * Verifies that an {@link IOMetricsInfo} constructed via the legacy (back-compat) constructor + * round-trips through Jackson with an empty {@code write-records-per-target} map rather than + * {@code null}. Guards against NPEs on the consumer side (e.g. autoscaler) when talking to a + * Flink version that pre-dates the per-target metric. + */ + @Test + void testIOMetricsInfoMarshallingWithEmptyPerTargetMap() throws Exception { + final IOMetricsInfo original = + new IOMetricsInfo(1L, true, 2L, true, 3L, true, 4L, true, 5L, 6L, 7.0); + + final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper(); + final String json = mapper.writeValueAsString(original); + final IOMetricsInfo deserialized = mapper.readValue(json, IOMetricsInfo.class); + + assertThat(deserialized).isEqualTo(original); + assertThat(deserialized.getRecordsWrittenPerTarget()).isNotNull().isEmpty(); + assertThat(json).contains(IOMetricsInfo.FIELD_NAME_RECORDS_WRITTEN_PER_TARGET); + } + + /** + * Verifies that a populated {@code write-records-per-target} map survives a Jackson round-trip + * intact, including the JSON field name used by external consumers. + */ + @Test + void testIOMetricsInfoMarshallingWithPerTargetEntries() throws Exception { + final Map perTarget = new HashMap<>(); + perTarget.put("abcdef0123456789abcdef0123456789", 42L); + perTarget.put("0123456789abcdef0123456789abcdef", 7L); + + final IOMetricsInfo original = + new IOMetricsInfo(1L, true, 2L, true, 3L, true, 4L, true, 5L, 6L, 7.0, perTarget); + + final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper(); + final String json = mapper.writeValueAsString(original); + final IOMetricsInfo deserialized = mapper.readValue(json, IOMetricsInfo.class); + + assertThat(deserialized).isEqualTo(original); + assertThat(deserialized.getRecordsWrittenPerTarget()) + .containsExactlyInAnyOrderEntriesOf(perTarget); + assertThat(json) + .contains("\"" + IOMetricsInfo.FIELD_NAME_RECORDS_WRITTEN_PER_TARGET + "\""); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/NonChainedOutputTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/NonChainedOutputTest.java new file mode 100644 index 0000000000000..99e8ad7336b14 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/NonChainedOutputTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link NonChainedOutput}, focused on the adaptive-execution patching contract of + * {@link NonChainedOutput#setTargetVertexId(JobVertexID)}. + */ +class NonChainedOutputTest { + + @Test + void testTargetVertexIdCanBePatchedAfterCreation() { + NonChainedOutput output = newOutput(/* targetVertexId */ null); + + assertThat(output.getTargetNodeId()) + .as("new NonChainedOutput in adaptive mode starts with a null target id") + .isNull(); + + JobVertexID downstreamId = new JobVertexID(); + output.setTargetVertexId(downstreamId); + + assertThat(output.getTargetNodeId()) + .as("setTargetVertexId patches in the now-known downstream JobVertexID") + .isEqualTo(downstreamId); + } + + @Test + void testEqualsAndHashCodeAreInvariantUnderTargetVertexIdPatch() { + IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + NonChainedOutput a = newOutput(dataSetId, null); + NonChainedOutput b = newOutput(dataSetId, null); + + assertThat(a).isEqualTo(b); + assertThat(a).hasSameHashCodeAs(b); + + a.setTargetVertexId(new JobVertexID()); + + assertThat(a) + .as( + "equality is keyed by dataSetId only, so patching the target id must not" + + " change equals/hashCode semantics") + .isEqualTo(b); + assertThat(a).hasSameHashCodeAs(b); + } + + private static NonChainedOutput newOutput(JobVertexID targetVertexId) { + return newOutput(new IntermediateDataSetID(), targetVertexId); + } + + private static NonChainedOutput newOutput( + IntermediateDataSetID dataSetId, JobVertexID targetVertexId) { + return new NonChainedOutput( + /* supportsUnalignedCheckpoints */ true, + /* sourceNodeId */ 0, + targetVertexId, + /* consumerParallelism */ 1, + /* consumerMaxParallelism */ 1, + /* bufferTimeout */ 0L, + /* isPersistentDataSet */ false, + dataSetId, + /* outputTag */ null, + new ForwardPartitioner<>(), + ResultPartitionType.PIPELINED_BOUNDED); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 7411a3e2fd9e4..de170cccb1e18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -229,6 +230,7 @@ public OWNER finish() { new NonChainedOutput( true, chainIndex, + new JobVertexID(), 1, 1, 100, @@ -286,6 +288,7 @@ public OWNER finishForSingletonOperatorChain( new NonChainedOutput( true, sourceVertexDummy.getId(), + new JobVertexID(), 1, 1, 100, @@ -439,6 +442,7 @@ private List createNonChainedOutputs( new NonChainedOutput( true, streamEdge.getTargetId(), + new JobVertexID(), 1, 1, 100, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java index ec35ce681e202..4aa0e66ede319 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -71,6 +72,7 @@ public MockStreamConfig( new NonChainedOutput( true, sourceVertex.getId(), + new JobVertexID(), 1, 1, 100, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java index 8801fb66f20a5..442e8adc9714c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; @@ -196,6 +197,82 @@ public Tuple2 map( assertThat(isJobGraphEquivalent(jobGraph1, jobGraph2)).isEqualTo(true); } + @Test + void testNonChainedOutputTargetVertexIdPatchedAfterDownstreamCreated() { + // Build a 2-stage pipeline that, under dynamic/adaptive job-graph construction, produces + // two separate JobVertices (upstream source+map, downstream keyed-map+sink) connected by + // a hash exchange. This is exactly the scenario where the upstream vertex is created + // first and its NonChainedOutput is left with a transiently-null targetVertexId until + // the downstream vertex is built in a later iteration. + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream> input = + env.fromData("a", "b", "c") + .map( + new MapFunction>() { + @Override + public Tuple2 map(String value) { + return new Tuple2<>(value, value); + } + }); + input.keyBy(x -> x.f0) + .map( + new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple2 value) { + return value; + } + }) + .sinkTo(new DiscardingSink<>()); + + StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setDynamic(true); + + AdaptiveGraphManager adaptiveGraphManager = + new AdaptiveGraphManager( + Thread.currentThread().getContextClassLoader(), streamGraph, Runnable::run); + + // Only the upstream vertex exists after the initial getJobGraph() call. + JobGraph jobGraph = adaptiveGraphManager.getJobGraph(); + List initialVertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + assertThat(initialVertices).hasSize(1); + JobVertex upstreamVertex = initialVertices.get(0); + + // The upstream's NonChainedOutput was created while the downstream JobVertex did not yet + // exist; its target id must be transiently null at this point. + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + List outputsBefore = + new StreamConfig(upstreamVertex.getConfiguration()).getVertexNonChainedOutputs(cl); + assertThat(outputsBefore) + .as("upstream should have exactly one non-chained output to the downstream") + .hasSize(1); + assertThat(outputsBefore.get(0).getTargetNodeId()) + .as( + "target id is transiently null before the downstream JobVertex has been" + + " created by the adaptive graph manager") + .isNull(); + + // Finishing the upstream triggers creation of the downstream vertex and the patching + + // re-serialization of the upstream's VERTEX_NONCHAINED_OUTPUTS. + List newlyCreated = + adaptiveGraphManager.onJobVertexFinished(upstreamVertex.getID()); + assertThat(newlyCreated).hasSize(1); + JobVertexID downstreamId = newlyCreated.get(0).getID(); + + // Re-read the persisted bytes of the upstream config: the target id must now reflect the + // downstream JobVertexID, proving both setTargetVertexId() and + // reserializeVertexNonChainedOutputs() were applied. + List outputsAfter = + new StreamConfig(upstreamVertex.getConfiguration()).getVertexNonChainedOutputs(cl); + assertThat(outputsAfter).hasSize(1); + assertThat(outputsAfter.get(0).getTargetNodeId()) + .as( + "after the downstream vertex is created, the upstream's persisted" + + " NonChainedOutput carries the downstream JobVertexID") + .isEqualTo(downstreamId); + } + @Test void testSourceChain() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContextTest.java index d3ef738e96151..03347e3401e8f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContextTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -106,6 +107,7 @@ void testModifyStreamEdge() { new NonChainedOutput( targetEdge.supportsUnalignedCheckpoints(), targetEdge.getSourceId(), + new JobVertexID(), targetNode.getParallelism(), targetNode.getMaxParallelism(), targetEdge.getBufferTimeout(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 2b55110d2621f..6878084c2be89 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -20,9 +20,20 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + /** Tests for {@link StreamingJobGraphGenerator}. */ @SuppressWarnings("serial") class StreamingJobGraphGeneratorTest extends JobGraphGeneratorTestBase { @@ -54,4 +65,77 @@ void verifyManagedMemoryFractionForUnknownResourceSpec( final StreamConfig map3Config = new StreamConfig(vertex3.getConfiguration()); verifyFractions(map3Config, 1.0, 0.0, 0.0, taskManagerConfig); } + + /** + * Verifies that each {@link NonChainedOutput} emitted for a source with multiple downstream + * consumers carries the {@link JobVertexID} of the actual downstream {@link JobVertex} it + * feeds. This target id is what the runtime uses to key the per-downstream-target {@code + * numRecordsOut} metric registered by {@code TaskIOMetricGroup#reuseRecordsOutputCounter}. + */ + @Test + void testNonChainedOutputCarriesDownstreamJobVertexId() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + final DataStreamSource source = env.fromData(1, 2, 3); + source.rebalance().map(x -> x).name("mapA").sinkTo(new DiscardingSink<>()).name("sinkA"); + source.rebalance().map(x -> x).name("mapB").sinkTo(new DiscardingSink<>()).name("sinkB"); + + final JobGraph jobGraph = createJobGraph(env.getStreamGraph()); + final List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + + final JobVertex sourceVertex = vertices.get(0); + final JobVertex mapAVertex = findJobVertex(vertices, "mapA"); + final JobVertex mapBVertex = findJobVertex(vertices, "mapB"); + + final StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration()); + final List nonChainedOutputs = + sourceConfig.getVertexNonChainedOutputs(getClass().getClassLoader()); + assertThat(nonChainedOutputs).hasSize(2); + assertThat(nonChainedOutputs) + .extracting(NonChainedOutput::getTargetNodeId) + .containsExactlyInAnyOrder(mapAVertex.getID(), mapBVertex.getID()); + } + + /** + * Broadcast fan-out variant of {@link #testNonChainedOutputCarriesDownstreamJobVertexId()}: + * distinct downstream vertices must each receive their own {@link NonChainedOutput} carrying + * the matching {@link JobVertexID}. Ensures the planner does not collapse broadcast outputs + * into a single target id, which would break the per-target metric. + */ + @Test + void testBroadcastFanOutTargetJobVertexIds() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + final DataStreamSource source = env.fromData(1, 2, 3); + final DataStream broadcast = source.broadcast(); + broadcast.map(x -> x).name("mapA").disableChaining().sinkTo(new DiscardingSink<>()); + broadcast.map(x -> x).name("mapB").disableChaining().sinkTo(new DiscardingSink<>()); + + final JobGraph jobGraph = createJobGraph(env.getStreamGraph()); + final List vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + + final JobVertex sourceVertex = vertices.get(0); + final JobVertex mapAVertex = findJobVertex(vertices, "mapA"); + final JobVertex mapBVertex = findJobVertex(vertices, "mapB"); + + final StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration()); + final List nonChainedOutputs = + sourceConfig.getVertexNonChainedOutputs(getClass().getClassLoader()); + assertThat(nonChainedOutputs).hasSize(2); + assertThat(nonChainedOutputs) + .extracting(NonChainedOutput::getTargetNodeId) + .containsExactlyInAnyOrder(mapAVertex.getID(), mapBVertex.getID()) + .doesNotContainNull(); + } + + private static JobVertex findJobVertex(List vertices, String name) { + for (JobVertex jobVertex : vertices) { + if (jobVertex.getName().contains(name)) { + return jobVertex; + } + } + throw new AssertionError("No job vertex found with name containing: " + name); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index 287c0f4ca75dd..b02dfdb5eec9d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; @@ -376,6 +377,7 @@ private SourceInputConfig initializeSourceInput( new NonChainedOutput( true, sourceToMainEdge.getSourceId(), + new JobVertexID(), 1, 1, 100, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 670e8622911b7..cf695ad79d43a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -245,6 +245,7 @@ public void setupOutputForSingletonOperatorChain() { new NonChainedOutput( true, sourceVertexDummy.getId(), + new JobVertexID(), 1, 1, 100, diff --git a/flink-tests/src/test/java/org/apache/flink/test/metrics/PerTargetNumRecordsOutITCase.java b/flink-tests/src/test/java/org/apache/flink/test/metrics/PerTargetNumRecordsOutITCase.java new file mode 100644 index 0000000000000..803c06921ff72 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/metrics/PerTargetNumRecordsOutITCase.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.metrics; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.Serial; +import java.time.Duration; +import java.util.Map; + +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * End-to-end verification of the per-downstream-target {@code numRecordsOut} metric exposed by + * {@code TaskIOMetricGroup} and surfaced through {@link IOMetricsInfo#getRecordsWrittenPerTarget()} + * in the REST {@code /jobs/:jobid} response. + * + *

Scenarios are organized into nested classes so each one can spin up a {@link + * MiniClusterExtension} with its own scheduler configuration: + * + *

    + *
  • {@link UnderDefaultScheduler} covers the default scheduler for streaming jobs: side output + * and broadcast fan-out. + *
  • {@link UnderAdaptiveBatchScheduler} covers {@code AdaptiveBatch} (which drives {@code + * AdaptiveGraphManager}); the test documents a known per-target limitation under that mode. + *
  • {@link UnderAdaptiveStreamingScheduler} covers the streaming {@code Adaptive} scheduler , + * which builds the full {@code JobGraph} upfront so per-target counters work end-to-end just + * like the default scheduler. + *
+ * + *

Assertions go through the REST API to validate the complete pipeline: runtime metric + * registration → {@code IOMetrics} snapshot → {@code MutableIOMetrics} per-subtask aggregation → + * {@code IOMetricsInfo} JSON round-trip. + * + *

Each nested class uses {@link TestInstance.Lifecycle#PER_CLASS} so that an instance-level + * {@code @RegisterExtension} field (required by {@link Nested} non-static inner classes under Java + * 11 source level) still receives {@code beforeAll} callbacks and the underlying mini cluster is + * started once per nested class. + */ +class PerTargetNumRecordsOutITCase { + + private static final int NUM_RECORDS = 20; + private static final int SIDE_OUTPUT_PREDICATE = 3; // keep every (i % 3 == 0) for side output + + @Nested + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class UnderDefaultScheduler { + + @RegisterExtension + private final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(8) + .build()); + + @Test + void testPerTargetCountsWithSideOutput( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + final OutputTag sideTag = new OutputTag("side") {}; + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + + final DataStream source = + env.fromSource( + new NumberSequenceSource(0, NUM_RECORDS - 1), + WatermarkStrategy.noWatermarks(), + "source-vertex") + .returns(BasicTypeInfo.LONG_TYPE_INFO); + + final SingleOutputStreamOperator splitter = + source.process( + new ProcessFunction() { + @Serial private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Long value, Context ctx, Collector out) { + if (value % SIDE_OUTPUT_PREDICATE == 0) { + ctx.output(sideTag, value); + } else { + out.collect(value); + } + } + }) + .name("splitter-vertex"); + + splitter.sinkTo(new DiscardingSink<>()).name("main-sink-vertex"); + splitter.getSideOutput(sideTag).sinkTo(new DiscardingSink<>()).name("side-sink-vertex"); + + final long expectedSide = countIf(value -> value % SIDE_OUTPUT_PREDICATE == 0); + final long expectedMain = NUM_RECORDS - expectedSide; + + final JobClient jobClient = env.executeAsync("per-target-side-output"); + final JobID jobId = jobClient.getJobID(); + jobClient.getJobExecutionResult().get(); + + final JobDetailsInfo jobDetails = fetchJobDetails(restClusterClient, jobId); + + final JobDetailsInfo.JobVertexDetailsInfo splitterVertex = + findVertex(jobDetails, "splitter-vertex"); + final JobVertexID mainSinkId = + findVertex(jobDetails, "main-sink-vertex").getJobVertexID(); + final JobVertexID sideSinkId = + findVertex(jobDetails, "side-sink-vertex").getJobVertexID(); + + final IOMetricsInfo metrics = splitterVertex.getJobVertexMetrics(); + assertThat(metrics.getRecordsWritten()) + .as("aggregate write-records equals total emits") + .isEqualTo(NUM_RECORDS); + + final Map perTarget = metrics.getRecordsWrittenPerTarget(); + assertThat(perTarget) + .as("per-target map surfaces both downstreams") + .containsEntry(mainSinkId.toHexString(), expectedMain) + .containsEntry(sideSinkId.toHexString(), expectedSide); + assertThat(perTarget.values().stream().mapToLong(Long::longValue).sum()) + .as("per-target sum equals aggregate when emits are routed, not broadcast") + .isEqualTo(NUM_RECORDS); + } + + @Test + void testPerTargetCountsWithBroadcast( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + + final DataStream source = + env.fromSource( + new NumberSequenceSource(0, NUM_RECORDS - 1), + WatermarkStrategy.noWatermarks(), + "broadcast-source-vertex") + .returns(BasicTypeInfo.LONG_TYPE_INFO); + + final DataStream broadcast = source.broadcast(); + broadcast.map(x -> x).name("fanout-a-vertex").sinkTo(new DiscardingSink<>()); + broadcast.map(x -> x).name("fanout-b-vertex").sinkTo(new DiscardingSink<>()); + broadcast.map(x -> x).name("fanout-c-vertex").sinkTo(new DiscardingSink<>()); + + final JobClient jobClient = env.executeAsync("per-target-broadcast"); + final JobID jobId = jobClient.getJobID(); + jobClient.getJobExecutionResult().get(); + + final JobDetailsInfo jobDetails = fetchJobDetails(restClusterClient, jobId); + + final JobDetailsInfo.JobVertexDetailsInfo sourceVertex = + findVertex(jobDetails, "broadcast-source-vertex"); + final JobVertexID fanoutA = findVertex(jobDetails, "fanout-a-vertex").getJobVertexID(); + final JobVertexID fanoutB = findVertex(jobDetails, "fanout-b-vertex").getJobVertexID(); + final JobVertexID fanoutC = findVertex(jobDetails, "fanout-c-vertex").getJobVertexID(); + + final IOMetricsInfo metrics = sourceVertex.getJobVertexMetrics(); + assertThat(metrics.getRecordsWritten()) + .as( + "aggregate write-records must count each logical emit once, " + + "even when broadcast fans out to N targets") + .isEqualTo(NUM_RECORDS); + + final Map perTarget = metrics.getRecordsWrittenPerTarget(); + assertThat(perTarget) + .as("per-target map has one entry per broadcast fan-out target") + .containsEntry(fanoutA.toHexString(), (long) NUM_RECORDS) + .containsEntry(fanoutB.toHexString(), (long) NUM_RECORDS) + .containsEntry(fanoutC.toHexString(), (long) NUM_RECORDS) + .hasSize(3); + } + } + + /** + * Same routed side-output topology as {@link UnderDefaultScheduler} but explicitly running + * under the {@code AdaptiveBatch} scheduler (FLIP-187), which drives job-graph construction + * through {@code AdaptiveGraphManager} in an incremental, per-stage fashion. + * + *

The expectation documents a known limitation: the upstream operator's {@code + * OperatorChain} binds its per-target counters once, at task startup, by reading + * {@code OP_NONCHAINED_OUTPUTS} from its {@code StreamConfig}. In adaptive batch the downstream + * {@link JobVertexID} is not known until a later iteration of {@code + * AdaptiveGraphManager} (triggered by the upstream finishing). By the time the planner-side + * patching (see {@code AdaptiveGraphManager#connectToFinishedUpStreamVertex}) stamps the + * downstream id into the cached {@link org.apache.flink.streaming.api.graph.NonChainedOutput}, + * the upstream task has already finished and its metrics are archived. Consequently the + * per-target map is empty for upstream vertices whose downstream is not yet known at task + * start; only the aggregate {@code numRecordsOut} is populated (and stays fully correct). + * + *

Consumers that need the per-target breakdown must treat an empty map as "breakdown + * unavailable for this vertex in adaptive batch" and fall back to the aggregate. + */ + @Nested + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class UnderAdaptiveBatchScheduler { + + @RegisterExtension + private final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(adaptiveBatchConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(8) + .build()); + + private Configuration adaptiveBatchConfiguration() { + final Configuration configuration = new Configuration(); + configuration.set( + JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch); + return configuration; + } + + @Test + void testPerTargetCountsWithSideOutput( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + final OutputTag sideTag = new OutputTag("side") {}; + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + env.setParallelism(1); + env.disableOperatorChaining(); + + final DataStream source = + env.fromSource( + new NumberSequenceSource(0, NUM_RECORDS - 1), + WatermarkStrategy.noWatermarks(), + "adaptive-batch-source-vertex") + .returns(BasicTypeInfo.LONG_TYPE_INFO); + + final SingleOutputStreamOperator splitter = + source.process( + new ProcessFunction() { + @Serial private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Long value, Context ctx, Collector out) { + if (value % SIDE_OUTPUT_PREDICATE == 0) { + ctx.output(sideTag, value); + } else { + out.collect(value); + } + } + }) + .name("adaptive-batch-splitter-vertex"); + + splitter.sinkTo(new DiscardingSink<>()).name("adaptive-batch-main-sink-vertex"); + splitter.getSideOutput(sideTag) + .sinkTo(new DiscardingSink<>()) + .name("adaptive-batch-side-sink-vertex"); + + final JobClient jobClient = env.executeAsync("per-target-adaptive-batch"); + final JobID jobId = jobClient.getJobID(); + jobClient.getJobExecutionResult().get(); + + final JobDetailsInfo jobDetails = fetchJobDetails(restClusterClient, jobId); + final JobDetailsInfo.JobVertexDetailsInfo splitterVertex = + findVertex(jobDetails, "adaptive-batch-splitter-vertex"); + final IOMetricsInfo metrics = splitterVertex.getJobVertexMetrics(); + + assertThat(metrics.getRecordsWritten()) + .as( + "aggregate write-records is fully populated even under adaptive" + + " batch: the aggregate counter is independent of any" + + " per-target key") + .isEqualTo(NUM_RECORDS); + + assertThat(metrics.getRecordsWrittenPerTarget()) + .as( + "per-target map is empty under adaptive batch for this edge: the" + + " downstream JobVertexID is not yet known at upstream-task" + + " start, so the runtime OperatorChain cannot register a" + + " per-target counter. This is a documented limitation of" + + " the feature; planner-side patching still happens for any" + + " external consumer that reads the serialized StreamConfig" + + " after job completion.") + .isNotNull() + .isEmpty(); + } + } + + /** + * Same assertions as {@link UnderDefaultScheduler} (streaming cases), but under the streaming + * {@code Adaptive} scheduler. The adaptive streaming scheduler builds the full {@code JobGraph} + * upfront (unlike {@code AdaptiveGraphManager} for batch), so all downstream {@link + * JobVertexID}s are known at upstream-task start time and per-target counters bind correctly + * end-to-end; consequently the expectations match the default-scheduler tests. + */ + @Nested + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class UnderAdaptiveStreamingScheduler { + + @RegisterExtension + private final MiniClusterExtension miniClusterExtension = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(adaptiveStreamingConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(8) + .build()); + + private Configuration adaptiveStreamingConfiguration() { + final Configuration configuration = new Configuration(); + configuration.set( + JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + // Keep the submission stabilization short so the test does not spend time waiting for + // additional slots beyond what the mini-cluster already advertises. + configuration.set( + JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT, + Duration.ofMillis(100L)); + return configuration; + } + + @Test + void testPerTargetCountsWithSideOutput( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + final OutputTag sideTag = new OutputTag("side") {}; + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + + final DataStream source = + env.fromSource( + new NumberSequenceSource(0, NUM_RECORDS - 1), + WatermarkStrategy.noWatermarks(), + "adaptive-stream-source") + .returns(BasicTypeInfo.LONG_TYPE_INFO); + + final SingleOutputStreamOperator splitter = + source.process( + new ProcessFunction() { + @Serial private static final long serialVersionUID = 1L; + + @Override + public void processElement( + Long value, Context ctx, Collector out) { + if (value % SIDE_OUTPUT_PREDICATE == 0) { + ctx.output(sideTag, value); + } else { + out.collect(value); + } + } + }) + .name("adaptive-stream-splitter"); + + splitter.sinkTo(new DiscardingSink<>()).name("adaptive-stream-main-sink"); + splitter.getSideOutput(sideTag) + .sinkTo(new DiscardingSink<>()) + .name("adaptive-stream-side-sink"); + + final long expectedSide = countIf(value -> value % SIDE_OUTPUT_PREDICATE == 0); + final long expectedMain = NUM_RECORDS - expectedSide; + + final JobClient jobClient = env.executeAsync("per-target-adaptive-streaming"); + final JobID jobId = jobClient.getJobID(); + jobClient.getJobExecutionResult().get(); + + final JobDetailsInfo jobDetails = fetchJobDetails(restClusterClient, jobId); + + final JobDetailsInfo.JobVertexDetailsInfo splitterVertex = + findVertex(jobDetails, "adaptive-stream-splitter"); + final JobVertexID mainSinkId = + findVertex(jobDetails, "adaptive-stream-main-sink").getJobVertexID(); + final JobVertexID sideSinkId = + findVertex(jobDetails, "adaptive-stream-side-sink").getJobVertexID(); + + final IOMetricsInfo metrics = splitterVertex.getJobVertexMetrics(); + assertThat(metrics.getRecordsWritten()) + .as("aggregate write-records equals total emits under adaptive streaming") + .isEqualTo(NUM_RECORDS); + + final Map perTarget = metrics.getRecordsWrittenPerTarget(); + assertThat(perTarget) + .as( + "per-target map is fully populated under the streaming Adaptive" + + " scheduler: the full JobGraph is built upfront, so all" + + " downstream JobVertexIDs are known when the upstream task" + + " binds its per-target counters") + .containsEntry(mainSinkId.toHexString(), expectedMain) + .containsEntry(sideSinkId.toHexString(), expectedSide); + assertThat(perTarget.values().stream().mapToLong(Long::longValue).sum()) + .as("per-target sum equals aggregate when emits are routed, not broadcast") + .isEqualTo(NUM_RECORDS); + } + + @Test + void testPerTargetCountsWithBroadcast( + @InjectClusterClient RestClusterClient restClusterClient) throws Exception { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + + final DataStream source = + env.fromSource( + new NumberSequenceSource(0, NUM_RECORDS - 1), + WatermarkStrategy.noWatermarks(), + "adaptive-stream-broadcast-source") + .returns(BasicTypeInfo.LONG_TYPE_INFO); + + final DataStream broadcast = source.broadcast(); + broadcast.map(x -> x).name("adaptive-stream-fanout-a").sinkTo(new DiscardingSink<>()); + broadcast.map(x -> x).name("adaptive-stream-fanout-b").sinkTo(new DiscardingSink<>()); + broadcast.map(x -> x).name("adaptive-stream-fanout-c").sinkTo(new DiscardingSink<>()); + + final JobClient jobClient = env.executeAsync("per-target-adaptive-streaming-broadcast"); + final JobID jobId = jobClient.getJobID(); + jobClient.getJobExecutionResult().get(); + + final JobDetailsInfo jobDetails = fetchJobDetails(restClusterClient, jobId); + + final JobDetailsInfo.JobVertexDetailsInfo sourceVertex = + findVertex(jobDetails, "adaptive-stream-broadcast-source"); + final JobVertexID fanoutA = + findVertex(jobDetails, "adaptive-stream-fanout-a").getJobVertexID(); + final JobVertexID fanoutB = + findVertex(jobDetails, "adaptive-stream-fanout-b").getJobVertexID(); + final JobVertexID fanoutC = + findVertex(jobDetails, "adaptive-stream-fanout-c").getJobVertexID(); + + final IOMetricsInfo metrics = sourceVertex.getJobVertexMetrics(); + assertThat(metrics.getRecordsWritten()) + .as( + "aggregate write-records counts each logical emit once, even when" + + " broadcast fans out to N targets under adaptive streaming") + .isEqualTo(NUM_RECORDS); + + assertThat(metrics.getRecordsWrittenPerTarget()) + .as("per-target map has one entry per broadcast fan-out target") + .containsEntry(fanoutA.toHexString(), (long) NUM_RECORDS) + .containsEntry(fanoutB.toHexString(), (long) NUM_RECORDS) + .containsEntry(fanoutC.toHexString(), (long) NUM_RECORDS) + .hasSize(3); + } + } + + private static JobDetailsInfo fetchJobDetails( + RestClusterClient restClusterClient, JobID jobId) throws Exception { + // The aggregated archived IOMetrics are populated when the ExecutionGraph transitions + // to a terminal state. getJobExecutionResult() already awaits that, but the REST handler + // reads from the ExecutionGraphCache which may briefly be empty for the just-finished + // job; poll until the response exposes the finished job's vertices. + final JobDetailsInfo[] result = new JobDetailsInfo[1]; + waitUntilCondition( + () -> { + result[0] = restClusterClient.getJobDetails(jobId).get(); + return result[0] != null && !result[0].getJobVertexInfos().isEmpty(); + }, + 10_000); + return result[0]; + } + + private static JobDetailsInfo.JobVertexDetailsInfo findVertex( + JobDetailsInfo details, String nameContains) { + return details.getJobVertexInfos().stream() + .filter(v -> v.getName().contains(nameContains)) + .findFirst() + .orElseThrow( + () -> + new AssertionError( + "No job vertex with name containing " + nameContains)); + } + + private static long countIf(java.util.function.LongPredicate pred) { + long count = 0; + for (int i = 0; i < NUM_RECORDS; i++) { + if (pred.test(i)) { + count++; + } + } + return count; + } +}