Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,12 @@
},
"write-records-complete" : {
"type" : "boolean"
},
"write-records-per-target" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
},
Expand Down Expand Up @@ -5102,6 +5108,12 @@
},
"write-records-complete" : {
"type" : "boolean"
},
"write-records-per-target" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
},
Expand Down Expand Up @@ -5743,6 +5755,12 @@
},
"write-records-complete" : {
"type" : "boolean"
},
"write-records-per-target" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
},
Expand Down Expand Up @@ -5873,6 +5891,12 @@
},
"write-records-complete" : {
"type" : "boolean"
},
"write-records-per-target" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
},
Expand Down Expand Up @@ -6251,6 +6275,12 @@
},
"write-records-complete" : {
"type" : "boolean"
},
"write-records-per-target" : {
"type" : "object",
"additionalProperties" : {
"type" : "integer"
}
}
}
},
Expand Down
5 changes: 5 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2469,6 +2469,11 @@ components:
format: int64
write-records-complete:
type: boolean
write-records-per-target:
type: object
additionalProperties:
type: integer
format: int64
Id:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,8 @@ private void updateAccumulatorsAndMetrics(
if (metrics != null) {
// Drop IOMetrics#resultPartitionBytes because it will not be used anymore. It can
// result in very high memory usage when there are many executions and sub-partitions.
// Preserve the per-downstream-target numRecordsOut breakdown so that REST consumers
// (e.g. the Kubernetes autoscaler) see it on terminal executions.
this.ioMetrics =
new IOMetrics(
metrics.getNumBytesIn(),
Expand All @@ -1782,7 +1784,9 @@ private void updateAccumulatorsAndMetrics(
metrics.getNumRecordsOut(),
metrics.getAccumulateIdleTime(),
metrics.getAccumulateBusyTime(),
metrics.getAccumulateBackPressuredTime());
metrics.getAccumulateBackPressuredTime(),
null,
metrics.getNumRecordsOutPerTarget());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public class IOMetrics implements Serializable {
@Nullable
protected Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes;

/**
* Per-downstream-target breakdown of {@link #numRecordsOut}, keyed by target {@code
* JobVertexID} (hex string). May be empty when the task has no outputs for which a target was
* registered (e.g. sinks, or the broadcast fan-out path). Never {@code null}.
*/
protected Map<String, Long> numRecordsOutPerTarget;

public IOMetrics(
Meter recordsIn,
Meter recordsOut,
Expand All @@ -60,7 +67,8 @@ public IOMetrics(
Gauge<Long> accumulatedIdleTime,
Gauge<Double> accumulatedBusyTime,
Map<IntermediateResultPartitionID, ResultPartitionBytesCounter>
resultPartitionBytesCounters) {
resultPartitionBytesCounters,
Map<String, Long> numRecordsOutPerTarget) {
this.numRecordsIn = recordsIn.getCount();
this.numRecordsOut = recordsOut.getCount();
this.numBytesIn = bytesIn.getCount();
Expand All @@ -74,6 +82,10 @@ public IOMetrics(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().createSnapshot()));
this.numRecordsOutPerTarget =
numRecordsOutPerTarget == null
? Collections.emptyMap()
: Collections.unmodifiableMap(numRecordsOutPerTarget);
}

public IOMetrics(
Expand Down Expand Up @@ -106,6 +118,29 @@ public IOMetrics(
long accumulateBackPressuredTime,
@Nullable
Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes) {
this(
numBytesIn,
numBytesOut,
numRecordsIn,
numRecordsOut,
accumulateIdleTime,
accumulateBusyTime,
accumulateBackPressuredTime,
resultPartitionBytes,
null);
}

@VisibleForTesting
public IOMetrics(
long numBytesIn,
long numBytesOut,
long numRecordsIn,
long numRecordsOut,
long accumulateIdleTime,
double accumulateBusyTime,
long accumulateBackPressuredTime,
@Nullable Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes,
@Nullable Map<String, Long> numRecordsOutPerTarget) {
this.numBytesIn = numBytesIn;
this.numBytesOut = numBytesOut;
this.numRecordsIn = numRecordsIn;
Expand All @@ -114,6 +149,10 @@ public IOMetrics(
this.accumulateBusyTime = accumulateBusyTime;
this.accumulateBackPressuredTime = accumulateBackPressuredTime;
this.resultPartitionBytes = resultPartitionBytes;
this.numRecordsOutPerTarget =
numRecordsOutPerTarget == null
? Collections.emptyMap()
: Collections.unmodifiableMap(numRecordsOutPerTarget);
}

public long getNumRecordsIn() {
Expand Down Expand Up @@ -147,4 +186,15 @@ public long getAccumulateIdleTime() {
public Map<IntermediateResultPartitionID, ResultPartitionBytes> getResultPartitionBytes() {
return Collections.unmodifiableMap(checkNotNull(resultPartitionBytes));
}

/**
* Returns the per-downstream-target {@code numRecordsOut} breakdown, keyed by target {@code
* JobVertexID} (hex string). Never {@code null}; returns an empty map when no per-target
* counters were registered.
*/
public Map<String, Long> getNumRecordsOutPerTarget() {
return numRecordsOutPerTarget == null
? Collections.emptyMap()
: Collections.unmodifiableMap(numRecordsOutPerTarget);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ private MetricNames() {}
public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE;
public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE;

/**
* Prefix used to expose the per-downstream-target breakdown of {@link #IO_NUM_RECORDS_OUT}. The
* full metric name is constructed via {@link #ioNumRecordsOutPerTargetName(String)}. This is
* intended to be consumed by tools (e.g. the Kubernetes autoscaler) that need to compute
* accurate edge data rates for vertices with multiple downstream outputs.
*/
public static final String IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX = IO_NUM_RECORDS_OUT + ".";

/** Returns the per-target metric name for the given downstream {@code JobVertexID}. */
public static String ioNumRecordsOutPerTargetName(String targetJobVertexId) {
return IO_NUM_RECORDS_OUT_PER_TARGET_PREFIX + targetJobVertexId;
}

public static final String IO_NUM_BYTES_IN = "numBytesIn";
public static final String IO_NUM_BYTES_OUT = "numBytesOut";
public static final String IO_NUM_BYTES_IN_RATE = IO_NUM_BYTES_IN + SUFFIX_RATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public IOMetrics createSnapshot() {
accumulatedBackPressuredTime,
accumulatedIdleTime,
accumulatedBusyTime,
resultPartitionBytes);
resultPartitionBytes,
numRecordsOut.getCounters());
}

// ============================================================================================
Expand All @@ -209,6 +210,17 @@ public Counter getNumRecordsOutCounter() {
return numRecordsOut;
}

/**
* Returns a snapshot of the per-downstream-target {@code numRecordsOut} counts, keyed by target
* {@code JobVertexID} (hex string). Only contains entries for outputs registered via {@link
* #reuseRecordsOutputCounter(Counter, String)}. The sum of the returned values may be less than
* {@link #getNumRecordsOutCounter()} when some outputs are registered without a target key
* (e.g. broadcast fan-out collectors).
*/
public Map<String, Long> getNumRecordsOutPerTarget() {
return numRecordsOut.getCounters();
}

public Counter getNumBuffersOutCounter() {
return numBuffersOut;
}
Expand Down Expand Up @@ -342,6 +354,35 @@ public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) {
this.numRecordsOut.addCounter(numRecordsOutCounter);
}

/**
* Registers an output record counter whose emits are directed to the given downstream target
* vertex. The counter contributes both to the aggregate {@link MetricNames#IO_NUM_RECORDS_OUT}
* and to a per-target metric {@code numRecordsOut.<targetJobVertexId>}, enabling consumers
* (e.g. the Kubernetes autoscaler) to compute accurate per-edge data rates when a task has
* multiple downstream outputs (including side outputs).
*/
public void reuseRecordsOutputCounter(Counter numRecordsOutCounter, String jobVertexId) {
this.numRecordsOut.addCounter(numRecordsOutCounter, jobVertexId);
// Also expose it as an individual, discoverable metric so reporters (Prometheus, JMX, etc.)
// and the REST metric store can see the per-target breakdown.
counter(MetricNames.ioNumRecordsOutPerTargetName(jobVertexId), numRecordsOutCounter);
}

/**
* Registers a per-downstream-target output record counter <em>without</em> adding it to the
* aggregate {@link MetricNames#IO_NUM_RECORDS_OUT} total. Used for the multi-output / broadcast
* fan-out path, where the aggregate is already incremented once per logical emit by the
* broadcast collector via {@link #reuseRecordsOutputCounter(Counter)}, and summing every
* per-target counter on top of that would double-count.
*
* <p>The counter is still exposed as the individual, discoverable metric {@code
* numRecordsOut.<targetJobVertexId>} and appears in {@link #getNumRecordsOutPerTarget()}.
*/
public void registerNumRecordsOutPerTarget(Counter numRecordsOutCounter, String jobVertexId) {
this.numRecordsOut.addTargetOnly(numRecordsOutCounter, jobVertexId);
counter(MetricNames.ioNumRecordsOutPerTargetName(jobVertexId), numRecordsOutCounter);
}

public void registerResultPartitionBytesCounter(
IntermediateResultPartitionID resultPartitionId,
ResultPartitionBytesCounter resultPartitionBytesCounter) {
Expand All @@ -358,13 +399,29 @@ public void registerMailboxSizeSupplier(SizeSupplier<Integer> supplier) {
*/
private static class SumCounter extends SimpleCounter {
private final List<Counter> internalCounters = new ArrayList<>();
private final Map<String, Counter> jobVertexIdToCounter = new HashMap<>();

SumCounter() {}

public void addCounter(Counter toAdd) {
internalCounters.add(toAdd);
}

public void addCounter(Counter toAdd, String jobVertexId) {
internalCounters.add(toAdd);
jobVertexIdToCounter.put(jobVertexId, toAdd);
}

/**
* Stores the counter for per-target lookup without contributing to the aggregate sum.
* Intended for the broadcast fan-out path where the aggregate is already incremented by a
* separate task-level counter and summing the per-target counters on top would
* double-count.
*/
public void addTargetOnly(Counter toAdd, String jobVertexId) {
jobVertexIdToCounter.put(jobVertexId, toAdd);
}

@Override
public long getCount() {
long sum = super.getCount();
Expand All @@ -373,6 +430,14 @@ public long getCount() {
}
return sum;
}

public Map<String, Long> getCounters() {
return jobVertexIdToCounter.entrySet().stream()
.collect(
HashMap::new,
(m, e) -> m.put(e.getKey(), e.getValue().getCount()),
HashMap::putAll);
}
}

private static class SizeGauge implements Gauge<Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
counts.isNumRecordsOutComplete(),
counts.getAccumulateBackPressuredTime(),
counts.getAccumulateIdleTime(),
counts.getAccumulateBusyTime());
counts.getAccumulateBusyTime(),
counts.getNumRecordsOutPerTargetMutable());

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
current.isNumRecordsOutComplete(),
current.getAccumulateBackPressuredTime(),
current.getAccumulateIdleTime(),
current.getAccumulateBusyTime()));
current.getAccumulateBusyTime(),
current.getNumRecordsOutPerTargetMutable()));
}

long duration;
Expand Down Expand Up @@ -259,7 +260,8 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
counts.isNumRecordsOutComplete(),
counts.getAccumulateBackPressuredTime(),
counts.getAccumulateIdleTime(),
counts.getAccumulateBusyTime());
counts.getAccumulateBusyTime(),
counts.getNumRecordsOutPerTargetMutable());

Map<ExecutionState, Integer> statusCounts =
CollectionUtil.newHashMapWithExpectedSize(ExecutionState.values().length);
Expand Down
Loading