From dcfcd871e553d9e3d2e195271d49253c308a87c7 Mon Sep 17 00:00:00 2001 From: Minwoo Kang Date: Thu, 7 May 2026 18:09:19 +0900 Subject: [PATCH 1/2] Move workers to returning idle supervisors EvenScheduler/DefaultScheduler do not move workers onto a supervisor that returns to service after maintenance: the topology already has its desired worker count spread across the surviving supervisors, so the returned supervisor sits at used=0 until an operator rebalances by hand. Add an opt-in, binary-trigger pass that relocates workers onto such idle supervisors, round-robin across topologies, in a single scheduling round. The feature is disabled by default, so existing clusters see no behavior change. needsScheduling is deliberately left untouched. The new trigger lives in Cluster.hasIdleSupervisorReusableBy and is reached only from EvenScheduler.redistributeOntoIdleSupervisors, which runs at the top of scheduleTopologiesEvenly and DefaultScheduler.defaultSchedule. ResourceAwareScheduler (needsSchedulingRas) and the multitenant pools keep their existing needsScheduling behavior and never enter the new path, so the feature is scoped to EvenScheduler/DefaultScheduler (and the leftover topologies IsolationScheduler delegates to them) only. The trigger is binary -- it fires only when at least one stable, non-blacklisted supervisor has zero used slots and the topology is not already on it -- so an "almost balanced" cluster never moves. Each topology contributes at most one worker per round-robin iteration, so the returned supervisor ends up hosting workers from several topologies (preserving the per-supervisor workload diversity a fresh submission has) instead of letting the first scheduled topology grab the whole idle capacity. Per-topology relocations in one round are capped at floor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCount, tightened further by max.free.per.topology when positive. Workers are pulled from the supervisor where the topology has the most workers (ties broken by supervisor id, lexicographically), never draining one below a single worker, and each pulled worker is placed directly onto an idle slot so the regular sortSlots/interleave pass cannot drop it back into the just-vacated slot. - DaemonConfig / conf/defaults.yaml (dot-only keys): nimbus.even.rebalance.idle.supervisor.enabled (false) nimbus.even.rebalance.max.free.per.topology (0 = unbounded) nimbus.even.rebalance.idle.supervisor.min.stable.rounds (3) - Cluster: new hasIdleSupervisorReusableBy (trigger) plus isIdleSupervisorAvailableForEvenRebalance and hasMinimumIdleSupervisorStability (eligibility + uptime guard, uptime >= min.stable.rounds * supervisor.monitor.frequency.secs) that skips a just-returned, possibly-flapping supervisor. All gated by the enabled flag; needsScheduling itself is unchanged. - SupervisorDetails.uptimeSecs surfaced from SupervisorInfo so the uptime guard can be evaluated; legacy constructors default it to Long.MAX_VALUE (always stable) to leave existing callers unchanged. - EvenScheduler.redistributeOntoIdleSupervisors returns immediately when the feature is disabled, so a default (disabled) cluster does no per-scheduling-round supervisor scanning. - Add TestEvenSchedulerIdleSupervisor covering the trigger, the per-topology drain cap, single-worker no-op, one-round even distribution, round-robin sharing across topologies, the uptime flap guard, deterministic donor tie-break, blacklist handling, the DefaultScheduler leftover-subset path, and the IsolationScheduler interaction (idle non-isolated target only; a reserved host stays out even when its isolated topology is down). --- conf/defaults.yaml | 3 + .../java/org/apache/storm/DaemonConfig.java | 31 + .../apache/storm/daemon/nimbus/Nimbus.java | 13 +- .../org/apache/storm/scheduler/Cluster.java | 58 ++ .../storm/scheduler/DefaultScheduler.java | 4 + .../apache/storm/scheduler/EvenScheduler.java | 154 +++++ .../storm/scheduler/SupervisorDetails.java | 25 + .../TestEvenSchedulerIdleSupervisor.java | 624 ++++++++++++++++++ 8 files changed, 908 insertions(+), 4 deletions(-) create mode 100644 storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 4368099725c..d4c61bdffba 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -88,6 +88,9 @@ topology.max.replication.wait.time.sec: 60 nimbus.credential.renewers.freq.secs: 600 nimbus.queue.size: 100000 scheduler.display.resource: false +nimbus.even.rebalance.idle.supervisor.enabled: false +nimbus.even.rebalance.max.free.per.topology: 0 +nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3 nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend" nimbus.assignments.service.threads: 10 nimbus.assignments.service.thread.queue.size: 100 diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index aabf7283c0d..97182d1d166 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -175,6 +175,37 @@ public class DaemonConfig implements Validated { @IsBoolean public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource"; + /** + * If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors + * with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers + * freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even + * distribution is approached gradually rather than rebuilt from scratch. + */ + @IsBoolean + public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED + = "nimbus.even.rebalance.idle.supervisor.enabled"; + + /** + * Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round + * when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The + * default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers / + * numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive + * value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded. + */ + @IsInteger + public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY + = "nimbus.even.rebalance.max.free.per.topology"; + + /** + * Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before + * {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a + * supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the + * uptime guard. + */ + @IsInteger + public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS + = "nimbus.even.rebalance.idle.supervisor.min.stable.rounds"; + /** * The directory where storm's health scripts go. */ diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d59a2527d21..bd209d03881 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -985,11 +985,16 @@ private static Map basicSupervisorDetailsMap(IStormCl String id = entry.getKey(); SupervisorInfo info = entry.getValue(); ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(), - info.get_scheduler_meta(), null, info.get_resources_map())); + info.get_scheduler_meta(), null, info.get_resources_map(), + supervisorUptimeSecs(info))); } return ret; } + private static long supervisorUptimeSecs(SupervisorInfo info) { + return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L; + } + /** * NOTE: this can return false when a topology has just been activated. The topology may still be * in the STORMS_SUBTREE. @@ -2273,7 +2278,8 @@ private Map readAllSupervisorDetails(Map superDetails = new ArrayList<>(); for (Entry entry : superInfos.entrySet()) { SupervisorInfo info = entry.getValue(); - superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map())); + superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(), + supervisorUptimeSecs(info))); } // Note that allSlotsAvailableForScheduling // only uses the supervisor-details. The rest of the arguments @@ -2306,7 +2312,7 @@ private Map readAllSupervisorDetails(Map assignedNumWorkers || getUnassignedExecutors(topology).size() > 0; } + /** + * Returns true when there is at least one stable, non-blacklisted supervisor whose slots are all currently free and the + * topology is not already on that supervisor. Controlled by + * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}; returns false when disabled. The check is + * binary by design -- a supervisor either has zero used slots or it does not -- so this never fires for "almost balanced" + * clusters. Topologies that cannot benefit from a move (e.g. only a single worker assigned) are filtered later by the + * drain-budget computation in {@link EvenScheduler}, which evaluates to zero whenever + * {@code floor(numWorkers / nonBlacklistedSupervisorCount)} is zero. + */ + public boolean hasIdleSupervisorReusableBy(TopologyDetails topology) { + if (!ObjectReader.getBoolean( + conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) { + return false; + } + Set nodesUsedByTopology = new HashSet<>(); + for (WorkerSlot slot : getUsedSlotsByTopologyId(topology.getId())) { + nodesUsedByTopology.add(slot.getNodeId()); + } + for (SupervisorDetails s : supervisors.values()) { + String sid = s.getId(); + if (!isIdleSupervisorAvailableForEvenRebalance(s)) { + continue; + } + if (nodesUsedByTopology.contains(sid)) { + continue; + } + return true; + } + return false; + } + + public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) { + if (supervisor == null) { + return false; + } + if (isBlackListed(supervisor.getId())) { + return false; + } + if (supervisor.getAllPorts().isEmpty()) { + return false; + } + if (!getUsedPorts(supervisor).isEmpty()) { + return false; + } + return hasMinimumIdleSupervisorStability(supervisor); + } + + private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) { + int minStableRounds = ObjectReader.getInt( + conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3); + if (minStableRounds <= 0) { + return true; + } + int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3); + long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs); + return supervisor.getUptimeSecs() >= requiredUptimeSecs; + } + @Override public boolean needsSchedulingRas(TopologyDetails topology) { return getUnassignedExecutors(topology).size() > 0; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 81a0ad8abcf..2ab34914d2a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -72,7 +72,11 @@ public static Set slotsCanReassign(Cluster cluster, Set } public static void defaultSchedule(Topologies topologies, Cluster cluster) { + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + if (topologies.getById(topology.getId()) == null) { + continue; + } List availableSlots = cluster.getAvailableSlots(); Set allExecutors = topology.getExecutors(); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index ccc2e34d4bb..d1cabb5ccda 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -18,18 +18,23 @@ package org.apache.storm.scheduler; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; import org.apache.storm.shade.com.google.common.collect.Sets; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; @@ -101,6 +106,151 @@ public static Map> getAliveAssignedWorkerSlotE return Utils.reverseMap(executorToSlot); } + /** + * Round-robin relocation of currently-assigned workers onto fully-idle supervisors. Each round-robin iteration moves + * at most one worker per topology, so multiple topologies share the idle slots and a single returning supervisor ends + * up hosting workers from several topologies — preserving the per-supervisor workload diversity that a fresh + * cluster has after submission. + * + *

Per-topology cap in one scheduling round is + * {@code idleSupervisorCount * floor(numWorkers / nonBlacklistedSupervisorCount)}, further tightened by + * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY} when set to a positive value. Topologies whose + * computed cap is zero (typically {@code numWorkers < numSupervisors}) are skipped entirely. The trigger remains + * binary — only fires when at least one supervisor has zero used slots — so a near-balanced cluster sees no + * movement. + * + *

Workers are always pulled from the supervisor where this topology has the most workers, and only when that + * supervisor would still hold at least one worker afterward. Each pulled worker's executors are placed directly + * onto an idle slot, so the subsequent sortSlots / interleave pass cannot drop them back into the just-vacated + * slots. Ties between equally loaded source supervisors are resolved by supervisor id, lexicographically. + * + *

Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the + * method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost. + */ + @VisibleForTesting + static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) { + if (!ObjectReader.getBoolean( + cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) { + return; + } + int nonBlacklistedSupervisorCount = 0; + int idleSupervisorCount = 0; + Deque idleTargets = new ArrayDeque<>(); + List supervisors = new ArrayList<>(cluster.getSupervisors().values()); + supervisors.sort(Comparator.comparing(SupervisorDetails::getId)); + for (SupervisorDetails s : supervisors) { + if (cluster.isBlackListed(s.getId())) { + continue; + } + if (s.getAllPorts().isEmpty()) { + continue; + } + nonBlacklistedSupervisorCount++; + if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) { + idleSupervisorCount++; + List ports = new ArrayList<>(s.getAllPorts()); + Collections.sort(ports); + for (Integer port : ports) { + idleTargets.add(new WorkerSlot(s.getId(), port)); + } + } + } + if (idleTargets.isEmpty() || nonBlacklistedSupervisorCount == 0 || idleSupervisorCount == 0) { + return; + } + + int maxFree = ObjectReader.getInt( + cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY), 0); + + List orderedTopos = new ArrayList<>(); + Map remainingBudget = new HashMap<>(); + for (TopologyDetails topo : topologies.getTopologies()) { + if (!cluster.hasIdleSupervisorReusableBy(topo)) { + continue; + } + int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount; + if (target <= 0) { + continue; + } + if (maxFree > 0) { + target = Math.min(target, maxFree); + } + orderedTopos.add(topo); + remainingBudget.put(topo.getId(), target); + } + if (orderedTopos.isEmpty()) { + return; + } + orderedTopos.sort(Comparator.comparing(TopologyDetails::getId)); + + int totalRelocated = 0; + while (!idleTargets.isEmpty()) { + boolean movedThisIteration = false; + for (TopologyDetails topo : orderedTopos) { + if (idleTargets.isEmpty()) { + break; + } + int remaining = remainingBudget.getOrDefault(topo.getId(), 0); + if (remaining <= 0) { + continue; + } + if (relocateOneWorkerOntoIdleSlot(topo, cluster, idleTargets)) { + remainingBudget.put(topo.getId(), remaining - 1); + totalRelocated++; + movedThisIteration = true; + } else { + remainingBudget.put(topo.getId(), 0); + } + } + if (!movedThisIteration) { + break; + } + } + if (totalRelocated > 0) { + LOG.info("EvenScheduler: relocated {} worker(s) onto idle supervisor(s) round-robin across {} topologies.", + totalRelocated, orderedTopos.size()); + } + } + + /** + * Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its + * executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if + * the topology has no eligible source supervisor — namely all of its supervisors host at most one of its workers, + * which would otherwise drain that supervisor to zero and turn it into the next round's idle. + */ + private static boolean relocateOneWorkerOntoIdleSlot(TopologyDetails topology, Cluster cluster, + Deque idleTargets) { + Map> slotToExecutors = + getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); + Map> nodeToSlots = new HashMap<>(); + for (WorkerSlot slot : slotToExecutors.keySet()) { + nodeToSlots.computeIfAbsent(slot.getNodeId(), k -> new ArrayList<>()).add(slot); + } + List>> candidates = new ArrayList<>(nodeToSlots.entrySet()); + candidates.removeIf(e -> e.getValue().size() < 2); + candidates.sort(Comparator + .>>comparingInt(e -> e.getValue().size()) + .reversed() + .thenComparing(Map.Entry::getKey)); + if (candidates.isEmpty()) { + return false; + } + List slots = candidates.get(0).getValue(); + slots.sort(Comparator.comparingInt(WorkerSlot::getPort)); + WorkerSlot victim = slots.get(slots.size() - 1); + Collection execs = slotToExecutors.get(victim); + if (execs == null || execs.isEmpty()) { + return false; + } + if (idleTargets.isEmpty()) { + return false; + } + WorkerSlot target = idleTargets.poll(); + cluster.freeSlot(victim); + cluster.assign(target, topology.getId(), execs); + return true; + } + private static Map scheduleTopology(TopologyDetails topology, Cluster cluster) { List availableSlots = cluster.getAvailableSlots(); Set allExecutors = topology.getExecutors(); @@ -148,7 +298,11 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) { } public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { + redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + if (topologies.getById(topology.getId()) == null) { + continue; + } String topologyId = topology.getId(); Map newAssignment = scheduleTopology(topology, cluster); Map> nodePortToExecutors = Utils.reverseMap(newAssignment); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java index 188627353dd..509ff0f7e76 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java @@ -46,6 +46,7 @@ public class SupervisorDetails { * all the ports of the supervisor. */ private Set allPorts; + private final long uptimeSecs; /** * Create the details of a new supervisor. @@ -59,12 +60,18 @@ public class SupervisorDetails { */ public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta, Collection allPorts, Map totalResources) { + this(id, serverPort, host, meta, schedulerMeta, allPorts, totalResources, Long.MAX_VALUE); + } + + public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { this.id = id; this.serverPort = serverPort; this.host = host; this.meta = meta; this.schedulerMeta = schedulerMeta; + this.uptimeSecs = uptimeSecs; if (allPorts != null) { setAllPorts(allPorts); } else { @@ -82,6 +89,10 @@ public SupervisorDetails(String id, Object meta, Map totalResour this(id, null, null, meta, null, null, totalResources); } + public SupervisorDetails(String id, Object meta, Map totalResources, long uptimeSecs) { + this(id, null, null, meta, null, null, totalResources, uptimeSecs); + } + public SupervisorDetails(String id, Object meta, Collection allPorts) { this(id, null, null, meta, null, allPorts, null); } @@ -95,11 +106,21 @@ public SupervisorDetails(String id, String host, Object schedulerMeta, this(id, null, host, null, schedulerMeta, allPorts, totalResources); } + public SupervisorDetails(String id, String host, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { + this(id, null, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs); + } + public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta, Collection allPorts, Map totalResources) { this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources); } + public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { + this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs); + } + @Override public String toString() { return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta @@ -126,6 +147,10 @@ public Set getAllPorts() { return allPorts; } + public long getUptimeSecs() { + return uptimeSecs; + } + private void setAllPorts(Collection allPorts) { this.allPorts = new HashSet<>(); if (allPorts != null) { diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java new file mode 100644 index 00000000000..b15a8335502 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java @@ -0,0 +1,624 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler; +import org.apache.storm.scheduler.resource.normalization.ResourceMetrics; +import org.apache.storm.topology.TopologyBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for the idle-supervisor rebalance behavior added to {@link Cluster#hasIdleSupervisorReusableBy(TopologyDetails)} + * and {@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)}. + * + *

Trigger condition is binary: at least one non-blacklisted supervisor with zero used slots must exist. The cluster + * being "almost balanced" never triggers the new logic, so a near-even distribution is preserved as-is. Each round only + * frees up to {@code nimbus.even.rebalance.max.free.per.topology} workers and never drains a supervisor down to zero. + */ +public class TestEvenSchedulerIdleSupervisor { + + private static final String TOPO_ID = "topo-1"; + + /** + * supA and supB host the topology; supC is freshly returned and idle. Topology has 2 workers on supA and 1 on supB. + */ + private Cluster buildClusterWithIdleSupervisor(boolean enableRebalance, int maxFreePerTopology) { + return buildClusterWithIdleSupervisor(TestUtilsForBlacklistScheduler.genSupervisors(3, 4), + evenRebalanceConf(enableRebalance, maxFreePerTopology)); + } + + private Cluster buildClusterWithIdleSupervisor(Map supMap, Map conf) { + // Build a topology and assign 3 workers: two on sup-0 and one on sup-1. sup-2 stays idle. + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s0p1 = new WorkerSlot("sup-0", 1); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + // Distribute the executors round-robin onto the three slots so each slot has at least one. + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + SchedulerAssignmentImpl assignment = new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null); + + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, assignment); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + return newCluster(supMap, assignments, topologies, conf); + } + + private Map evenRebalanceConf(boolean enableRebalance, int maxFreePerTopology) { + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, enableRebalance); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, maxFreePerTopology); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 0); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + return conf; + } + + private Map genSupervisorsWithUptime(int numSup, int numPorts, long uptimeSecs) { + Map supMap = new HashMap<>(); + for (int i = 0; i < numSup; i++) { + SupervisorDetails sup = supervisor("sup-" + i, "host-" + i, numPorts, uptimeSecs); + supMap.put(sup.getId(), sup); + } + return supMap; + } + + private SupervisorDetails supervisor(String id, String host, int numPorts, long uptimeSecs) { + List ports = new LinkedList<>(); + for (int port = 0; port < numPorts; port++) { + ports.add(port); + } + return new SupervisorDetails(id, host, null, ports, null, uptimeSecs); + } + + private Cluster newCluster(Map supMap, + Map assignments, + Topologies topologies, + Map conf) { + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + return new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, supMap, + assignments, topologies, conf); + } + + private TopologyDetails makeTopologyDetails(String id, int numWorkers, int parallelism) { + Config conf = new Config(); + conf.put(Config.TOPOLOGY_NAME, id); + conf.put(Config.TOPOLOGY_WORKERS, numWorkers); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout-0", new TestUtilsForBlacklistScheduler.TestSpout(), parallelism); + builder.setBolt("bolt-0", new TestUtilsForBlacklistScheduler.TestBolt(), parallelism).shuffleGrouping("spout-0"); + StormTopology stormTopology = builder.createTopology(); + + Map execsAndComps = TestUtilsForBlacklistScheduler.genExecsAndComps( + stormTopology, parallelism, parallelism); + return new TopologyDetails(id, conf, stormTopology, numWorkers, execsAndComps, 0, "user"); + } + + private TopologyDetails makeTopologyDetails(String id, int numWorkers) { + return makeTopologyDetails(id, numWorkers, 3); + } + + private TopologyDetails firstTopology(Cluster cluster) { + return cluster.getTopologies().getById(TOPO_ID); + } + + private int usedSlotCount(Cluster cluster, String supervisorId) { + SupervisorDetails s = cluster.getSupervisorById(supervisorId); + return cluster.getUsedPorts(s).size(); + } + + @Test + public void disabledByDefault_doesNotTrigger() { + Cluster cluster = buildClusterWithIdleSupervisor(false, 1); + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "disabled flag must short-circuit the trigger even when an idle supervisor exists"); + assertFalse(cluster.needsScheduling(firstTopology(cluster)), + "needsScheduling must remain false when the new behavior is disabled and the topology is fully assigned"); + } + + @Test + public void enabledWithIdleSupervisor_doesNotChangeGenericNeedsScheduling() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + assertTrue(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + assertFalse(cluster.needsScheduling(firstTopology(cluster)), + "needsScheduling is used by schedulers other than EvenScheduler; the idle trigger stays out of that generic path"); + assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)), + "ResourceAwareScheduler keeps using needsSchedulingRas, so this opt-in EvenScheduler feature is out of RAS scope"); + } + + @Test + public void noIdleSupervisor_doesNotTrigger() { + // Two supervisors, both serving the topology -> no idle supervisor present. + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 4); + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s0p1 = new WorkerSlot("sup-0", 1); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 1); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + assertFalse(cluster.needsScheduling(firstTopology(cluster))); + } + + @Test + public void redistributeRelocatesAtMostMaxFreeWorkersPerTopology() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + + EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster); + + // max-free=1 caps the topology to a single relocation; pulled from the most-loaded supervisor (sup-0) + // and placed directly onto the idle supervisor. + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(1, usedSlotCount(cluster, "sup-2")); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + } + + @Test + public void redistributeNeverDrainsSupervisorToZero() { + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 2); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 5); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + // floor(2/3)=0 → topology gets a budget of 0 and is skipped entirely. No source supervisor is drained. + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void scheduleTopologiesEvenly_movesOneWorkerToIdleSupervisor() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + + Topologies topologies = cluster.getTopologies(); + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + // After scheduling: idle supervisor (sup-2) should now host exactly 1 worker. + assertEquals(1, usedSlotCount(cluster, "sup-2")); + // Total worker count is preserved (3) and respects the topology's numWorkers. + int total = usedSlotCount(cluster, "sup-0") + + usedSlotCount(cluster, "sup-1") + + usedSlotCount(cluster, "sup-2"); + assertEquals(3, total); + } + + /** + * Single-worker topology + idle supervisors must produce no movement: {@code floor(1 / N) = 0} for any N >= 2, so the + * drain budget evaluates to zero regardless of how many idle supervisors exist. Without this guard a single-worker + * topology would ping-pong between supervisors every monitor cycle. + */ + @Test + public void singleWorkerTopology_doesNotMoveDespiteIdleSupervisors() { + String topoId = "topo-single"; + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(topoId, 1, 1); + + WorkerSlot s0 = new WorkerSlot("sup-0", 0); + Map execToSlot = new HashMap<>(); + for (ExecutorDetails e : topology.getExecutors()) { + execToSlot.put(e, s0); + } + Map assignments = new HashMap<>(); + assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(topoId, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(0, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + /** + * 8-worker topology starts at distribution (4, 4, 0). With max-free unbounded the budget targets + * floor(numWorkers / numSupervisors) = 2 workers for the idle supervisor, and the round ends at (3, 3, 2) + * — fully even — without disturbing topologies on the next round (no supervisor is idle anymore). + */ + @Test + public void evenDistributionInOneRound_unboundedMaxFree() { + String topoId = "topo-even"; + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(topoId, 8, 4); + + WorkerSlot[] slots = new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3), + }; + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slots[i]); + } + Map assignments = new HashMap<>(); + assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(topoId, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + assertEquals(4, usedSlotCount(cluster, "sup-0")); + assertEquals(4, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + // Idle supervisor absorbs exactly floor(8/3) = 2 workers in one round; total worker count is preserved. + assertEquals(2, usedSlotCount(cluster, "sup-2")); + assertEquals(8, usedSlotCount(cluster, "sup-0") + + usedSlotCount(cluster, "sup-1") + + usedSlotCount(cluster, "sup-2")); + // No supervisor is idle anymore — the trigger will not refire on the next round. + assertFalse(cluster.hasIdleSupervisorReusableBy(cluster.getTopologies().getById(topoId))); + } + + /** + * Two equally-sized topologies share the same returning supervisor round-robin: each contributes one worker, so + * sup-2 ends up hosting workers from both — restoring per-supervisor workload diversity the way a fresh submission + * would. + */ + @Test + public void multipleTopologies_shareIdleSlotsRoundRobin() { + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + + TopologyDetails topoA = makeTopologyDetails("topo-A", 4, 2); + TopologyDetails topoB = makeTopologyDetails("topo-B", 4, 2); + + Map assignments = new HashMap<>(); + assignments.put("topo-A", buildAssignment(topoA, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + assignments.put("topo-B", buildAssignment(topoB, new WorkerSlot[]{ + new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3), + new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3), + })); + + Map topoMap = new HashMap<>(); + topoMap.put("topo-A", topoA); + topoMap.put("topo-B", topoB); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + // floor(4/3)=1 per topology, two topologies → sup-2 hosts 1 worker from each, in round-robin order. + assertEquals(2, usedSlotCount(cluster, "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-A", "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-B", "sup-2")); + // Each topology kept its total worker count; only one host moved. + assertEquals(4, cluster.getAssignedNumWorkers(topoA)); + assertEquals(4, cluster.getAssignedNumWorkers(topoB)); + } + + @Test + public void idleSupervisorYoungerThanStableRoundsDoesNotMoveWorkers() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 8)); + + Map conf = evenRebalanceConf(true, 1); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "3 stable rounds at a 3 second monitor frequency require at least 9 seconds of supervisor uptime"); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void idleSupervisorAtStableRoundThresholdCanReceiveWorker() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 9)); + + Map conf = evenRebalanceConf(true, 1); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); + + assertTrue(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(1, usedSlotCount(cluster, "sup-2")); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + } + + @Test + public void donorTieBreaksBySupervisorIdWhenWorkerCountsTie() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails topology = makeTopologyDetails("topo-tie", 4, 4); + + Map assignments = new HashMap<>(); + assignments.put("topo-tie", buildAssignment(topology, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + + Map topoMap = new HashMap<>(); + topoMap.put("topo-tie", topology); + Topologies topologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1)); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-0"), + "sup-0 and sup-1 started with two workers each; lexicographic tie-break chooses sup-0 as donor"); + assertEquals(2, supervisorWorkerCount(cluster, "topo-tie", "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-2")); + } + + @Test + public void blacklistedIdleSupervisorIsNotReusableTarget() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + cluster.blacklistHost("host-2"); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "IsolationScheduler represents reserved hosts by blacklisting them before delegating to DefaultScheduler"); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 2, 2); + TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3); + + Map assignments = new HashMap<>(); + assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + })); + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies allTopologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0)); + + cluster.blacklistHost("host-0"); + DefaultScheduler.defaultSchedule(new Topologies(regular), cluster); + + assertEquals(2, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"), + "the isolated topology is not in the leftover topology set, so DefaultScheduler must not move it"); + assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2")); + } + + @Test + public void isolationSchedulerOnlyRelocatesLeftoverTopologyOntoNonIsolatedIdleSupervisor() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1); + TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3); + + Map assignments = new HashMap<>(); + assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), + })); + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies topologies = new Topologies(topoMap); + + Map conf = evenRebalanceConf(true, 0); + conf.put(DaemonConfig.ISOLATION_SCHEDULER_MACHINES, Collections.singletonMap(isolated.getName(), 1)); + Cluster cluster = newCluster(supMap, assignments, topologies, conf); + + IsolationScheduler scheduler = new IsolationScheduler(); + scheduler.prepare(conf, new StormMetricsRegistry()); + scheduler.schedule(topologies, cluster); + + assertEquals(1, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"), + "the already-isolated topology remains on its isolated host and is not selected as a donor"); + assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"), + "only the leftover regular topology is allowed to move onto the non-isolated idle supervisor"); + } + + /** + * IsolationScheduler reserves a host by blacklisting it before delegating the remaining (non-isolated) topologies to + * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}. Here the isolated topology is down -- it has no + * assigned workers at all -- yet its reserved host (sup-2) must not be treated as an idle relocation target even + * though it has zero used slots. The leftover regular topology is rebalanced onto the genuinely idle, non-reserved + * sup-3 and never onto the reserved sup-2. + */ + @Test + public void reservedHostForDownIsolatedTopologyIsNotTreatedAsIdle() { + Map supMap = genSupervisorsWithUptime(4, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1); + TopologyDetails regular = makeTopologyDetails("topo-regular", 4, 4); + + Map assignments = new HashMap<>(); + // The isolated topology is down: it has no assignment at all. + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies allTopologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0)); + + // sup-2 is reserved for the (down) isolated topology -- IsolationScheduler represents this by blacklisting it. + cluster.blacklistHost("host-2"); + + assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")), + "a blacklisted reserved host is never an even-rebalance target, even with zero used slots"); + assertTrue(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-3")), + "the non-reserved idle supervisor is available"); + + // IsolationScheduler delegates the leftover (non-isolated) topologies to DefaultScheduler with the reserved + // host already blacklisted. + DefaultScheduler.defaultSchedule(new Topologies(regular), cluster); + + assertEquals(0, usedSlotCount(cluster, "sup-2"), + "the reserved host stays idle: the down isolated topology's machine is not repopulated by rebalance"); + assertEquals(0, supervisorWorkerCount(cluster, regular.getId(), "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-3"), + "the leftover regular topology rebalances onto the genuinely idle, non-reserved supervisor"); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-0")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(0, cluster.getUsedSlotsByTopologyId(isolated.getId()).size(), + "the isolated topology is down and is never scheduled by the leftover path"); + } + + private SchedulerAssignmentImpl buildAssignment(TopologyDetails topology, WorkerSlot[] slots) { + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map map = new HashMap<>(); + for (int i = 0; i < execs.size(); i++) { + map.put(execs.get(i), slots[i % slots.length]); + } + return new SchedulerAssignmentImpl(topology.getId(), map, null, null); + } + + private int supervisorWorkerCount(Cluster cluster, String topologyId, String supervisorId) { + int count = 0; + for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topologyId)) { + if (slot.getNodeId().equals(supervisorId)) { + count++; + } + } + return count; + } +} From 33c875503714ad31084534e8c3de63288a58fd82 Mon Sep 17 00:00:00 2001 From: Minwoo Kang Date: Fri, 5 Jun 2026 10:21:56 +0900 Subject: [PATCH 2/2] Address review feedback on idle-supervisor rebalance - Assert getAssignedNumWorkers on every relocating test (incl. the donor tie-break case) so a relocation that loses executors no longer passes on slot counts alone. - Tighten multipleTopologies_shareIdleSlotsRoundRobin to exact per-topology counts plus donor-not-drained, so a broken inner loop can no longer pass. - Parameterize the flap-guard boundary test over threshold-1/threshold/ threshold+1. - Hoist the enabled flag and idle-supervisor set out of the per-topology loop via topologyCanReuseIdleSupervisor, dropping the redundant per-topology config read and supervisor rescan. - Route the remaining hand-built test fixtures through the shared newCluster/evenRebalanceConf/buildAssignment helpers. - Document the needsSchedulingTopologies null-guard in DefaultScheduler and EvenScheduler, spelling out the full-set / single-topology / leftover-subset caller cases. --- .../storm/scheduler/DefaultScheduler.java | 5 + .../apache/storm/scheduler/EvenScheduler.java | 31 +++- .../TestEvenSchedulerIdleSupervisor.java | 157 +++++++----------- 3 files changed, 93 insertions(+), 100 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 2ab34914d2a..7f6cc9cdfe2 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -74,6 +74,11 @@ public static Set slotsCanReassign(Cluster cluster, Set public static void defaultSchedule(Topologies topologies, Cluster cluster) { EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the + // topologies passed in: DefaultScheduler.schedule passes the full set (so the guard is a no-op), while + // IsolationScheduler delegates only its leftover, non-isolated topologies here. redistributeOntoIdleSupervisors + // above acted only on that passed-in set too. Skip topologies outside it so the leftover path never schedules + // one the caller excluded -- e.g. a down isolated topology on a reserved host. if (topologies.getById(topology.getId()) == null) { continue; } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index d1cabb5ccda..38691b06c06 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -136,6 +136,7 @@ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster clust int nonBlacklistedSupervisorCount = 0; int idleSupervisorCount = 0; Deque idleTargets = new ArrayDeque<>(); + Set idleSupervisorIds = new HashSet<>(); List supervisors = new ArrayList<>(cluster.getSupervisors().values()); supervisors.sort(Comparator.comparing(SupervisorDetails::getId)); for (SupervisorDetails s : supervisors) { @@ -148,6 +149,7 @@ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster clust nonBlacklistedSupervisorCount++; if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) { idleSupervisorCount++; + idleSupervisorIds.add(s.getId()); List ports = new ArrayList<>(s.getAllPorts()); Collections.sort(ports); for (Integer port : ports) { @@ -165,7 +167,9 @@ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster clust List orderedTopos = new ArrayList<>(); Map remainingBudget = new HashMap<>(); for (TopologyDetails topo : topologies.getTopologies()) { - if (!cluster.hasIdleSupervisorReusableBy(topo)) { + // Equivalent to cluster.hasIdleSupervisorReusableBy(topo) but reuses the idle-supervisor set already + // computed above instead of re-reading the config and re-scanning every supervisor for each topology. + if (!topologyCanReuseIdleSupervisor(cluster, topo, idleSupervisorIds)) { continue; } int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount; @@ -212,6 +216,26 @@ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster clust } } + /** + * Returns true when at least one of the already-identified idle supervisors does not currently host {@code topology} + * -- i.e. the topology can gain workload diversity by relocating onto it. This mirrors the binary trigger in + * {@link Cluster#hasIdleSupervisorReusableBy(TopologyDetails)} but operates on the pre-computed {@code idleSupervisorIds} + * to avoid the redundant per-topology config read and full supervisor rescan. + */ + private static boolean topologyCanReuseIdleSupervisor(Cluster cluster, TopologyDetails topology, + Set idleSupervisorIds) { + Set nodesUsedByTopology = new HashSet<>(); + for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topology.getId())) { + nodesUsedByTopology.add(slot.getNodeId()); + } + for (String idleSupervisorId : idleSupervisorIds) { + if (!nodesUsedByTopology.contains(idleSupervisorId)) { + return true; + } + } + return false; + } + /** * Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its * executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if @@ -300,6 +324,11 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) { public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the + // topologies passed in: EvenScheduler.schedule passes the full set (so the guard is a no-op), while + // DefaultScheduler.defaultSchedule calls us once per leftover topology with a single-topology Topologies. + // redistributeOntoIdleSupervisors above acted only on that passed-in set too. Skip topologies outside it so + // the leftover path never schedules one the caller excluded -- e.g. a down isolated topology on a reserved host. if (topologies.getById(topology.getId()) == null) { continue; } diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java index b15a8335502..ed5b9e87d4e 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java @@ -31,6 +31,8 @@ import org.apache.storm.scheduler.resource.normalization.ResourceMetrics; import org.apache.storm.topology.TopologyBuilder; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -173,31 +175,16 @@ public void noIdleSupervisor_doesNotTrigger() { Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 4); TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3); - WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); - WorkerSlot s0p1 = new WorkerSlot("sup-0", 1); - WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); - - List execs = new LinkedList<>(topology.getExecutors()); - Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); - Map execToSlot = new HashMap<>(); - WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0}; - for (int i = 0; i < execs.size(); i++) { - execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); - } Map assignments = new HashMap<>(); - assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-1", 0), + })); Map topoMap = new HashMap<>(); topoMap.put(TOPO_ID, topology); Topologies topologies = new Topologies(topoMap); - Map conf = new HashMap<>(); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 1); - StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); - ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); - Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, - supMap, assignments, topologies, conf); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1)); assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); assertFalse(cluster.needsScheduling(firstTopology(cluster))); @@ -225,30 +212,16 @@ public void redistributeNeverDrainsSupervisorToZero() { Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); TopologyDetails topology = makeTopologyDetails(TOPO_ID, 2); - WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); - WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); - - List execs = new LinkedList<>(topology.getExecutors()); - Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); - Map execToSlot = new HashMap<>(); - WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s1p0}; - for (int i = 0; i < execs.size(); i++) { - execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); - } Map assignments = new HashMap<>(); - assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-1", 0), + })); Map topoMap = new HashMap<>(); topoMap.put(TOPO_ID, topology); Topologies topologies = new Topologies(topoMap); - Map conf = new HashMap<>(); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 5); - StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); - ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); - Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, - supMap, assignments, topologies, conf); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 5)); EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); @@ -272,6 +245,8 @@ public void scheduleTopologiesEvenly_movesOneWorkerToIdleSupervisor() { + usedSlotCount(cluster, "sup-1") + usedSlotCount(cluster, "sup-2"); assertEquals(3, total); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)), + "relocation must preserve the topology's declared worker count, not just keep 3 slots occupied"); } /** @@ -285,25 +260,14 @@ public void singleWorkerTopology_doesNotMoveDespiteIdleSupervisors() { Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); TopologyDetails topology = makeTopologyDetails(topoId, 1, 1); - WorkerSlot s0 = new WorkerSlot("sup-0", 0); - Map execToSlot = new HashMap<>(); - for (ExecutorDetails e : topology.getExecutors()) { - execToSlot.put(e, s0); - } Map assignments = new HashMap<>(); - assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{ new WorkerSlot("sup-0", 0) })); Map topoMap = new HashMap<>(); topoMap.put(topoId, topology); Topologies topologies = new Topologies(topoMap); - Map conf = new HashMap<>(); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); - StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); - ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); - Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, - supMap, assignments, topologies, conf); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0)); EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); @@ -323,32 +287,19 @@ public void evenDistributionInOneRound_unboundedMaxFree() { Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); TopologyDetails topology = makeTopologyDetails(topoId, 8, 4); - WorkerSlot[] slots = new WorkerSlot[]{ + Map assignments = new HashMap<>(); + assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3), new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3), - }; - List execs = new LinkedList<>(topology.getExecutors()); - Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); - Map execToSlot = new HashMap<>(); - for (int i = 0; i < execs.size(); i++) { - execToSlot.put(execs.get(i), slots[i]); - } - Map assignments = new HashMap<>(); - assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + })); Map topoMap = new HashMap<>(); topoMap.put(topoId, topology); Topologies topologies = new Topologies(topoMap); - Map conf = new HashMap<>(); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); - StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); - ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); - Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, - supMap, assignments, topologies, conf); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0)); assertEquals(4, usedSlotCount(cluster, "sup-0")); assertEquals(4, usedSlotCount(cluster, "sup-1")); @@ -361,6 +312,8 @@ public void evenDistributionInOneRound_unboundedMaxFree() { assertEquals(8, usedSlotCount(cluster, "sup-0") + usedSlotCount(cluster, "sup-1") + usedSlotCount(cluster, "sup-2")); + assertEquals(8, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)), + "relocation must preserve the declared worker count of an 8-worker topology"); // No supervisor is idle anymore — the trigger will not refire on the next round. assertFalse(cluster.hasIdleSupervisorReusableBy(cluster.getTopologies().getById(topoId))); } @@ -392,61 +345,57 @@ public void multipleTopologies_shareIdleSlotsRoundRobin() { topoMap.put("topo-B", topoB); Topologies topologies = new Topologies(topoMap); - Map conf = new HashMap<>(); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); - StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); - ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); - Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, - supMap, assignments, topologies, conf); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0)); EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); // floor(4/3)=1 per topology, two topologies → sup-2 hosts 1 worker from each, in round-robin order. + // Exact counts (not >= 1) are what actually enforce round-robin fairness: a broken inner loop that let the + // first topology grab both idle slots would leave topo-B at 0 here. assertEquals(2, usedSlotCount(cluster, "sup-2")); assertEquals(1, supervisorWorkerCount(cluster, "topo-A", "sup-2")); assertEquals(1, supervisorWorkerCount(cluster, "topo-B", "sup-2")); // Each topology kept its total worker count; only one host moved. assertEquals(4, cluster.getAssignedNumWorkers(topoA)); assertEquals(4, cluster.getAssignedNumWorkers(topoB)); + // Donor supervisors are never drained to zero (which would make them the next round's idle target). + assertTrue(usedSlotCount(cluster, "sup-0") > 0); + assertTrue(usedSlotCount(cluster, "sup-1") > 0); } - @Test - public void idleSupervisorYoungerThanStableRoundsDoesNotMoveWorkers() { + /** + * Flap-guard boundary: with 3 stable rounds at a 3s monitor frequency a returning supervisor must have been up for + * at least 9 seconds before it is eligible. {@code uptime == requiredUptime} is the first value that moves, making + * the off-by-one contract explicit: {@code uptimeSecs >= minStableRounds * monitorFrequencySecs}. + */ + @ParameterizedTest + @CsvSource({ + "8, false", // threshold - 1: too young, stays idle + "9, true", // exactly at threshold: eligible + "10, true", // threshold + 1: eligible + }) + public void flapGuardHonorsMinStableRoundBoundary(long sup2UptimeSecs, boolean expectMove) { Map supMap = genSupervisorsWithUptime(3, 4, 100); - supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 8)); + supMap.put("sup-2", supervisor("sup-2", "host-2", 4, sup2UptimeSecs)); Map conf = evenRebalanceConf(true, 1); conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); - assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + assertEquals(expectMove, cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), "3 stable rounds at a 3 second monitor frequency require at least 9 seconds of supervisor uptime"); EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); - assertEquals(2, usedSlotCount(cluster, "sup-0")); - assertEquals(1, usedSlotCount(cluster, "sup-1")); - assertEquals(0, usedSlotCount(cluster, "sup-2")); - } - - @Test - public void idleSupervisorAtStableRoundThresholdCanReceiveWorker() { - Map supMap = genSupervisorsWithUptime(3, 4, 100); - supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 9)); - - Map conf = evenRebalanceConf(true, 1); - conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); - conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); - Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); - - assertTrue(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); - - EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); - - assertEquals(1, usedSlotCount(cluster, "sup-2")); - assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + if (expectMove) { + assertEquals(1, usedSlotCount(cluster, "sup-2")); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + } else { + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } } @Test @@ -471,6 +420,8 @@ public void donorTieBreaksBySupervisorIdWhenWorkerCountsTie() { "sup-0 and sup-1 started with two workers each; lexicographic tie-break chooses sup-0 as donor"); assertEquals(2, supervisorWorkerCount(cluster, "topo-tie", "sup-1")); assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-2")); + assertEquals(4, cluster.getAssignedNumWorkers(topology), + "the tie-break relocation preserves the topology's declared worker count"); } @Test @@ -516,6 +467,9 @@ public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() { assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2")); assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2")); + // Both topologies keep their declared worker counts: the leftover one is relocated, the excluded one untouched. + assertEquals(3, cluster.getAssignedNumWorkers(regular)); + assertEquals(2, cluster.getAssignedNumWorkers(isolated)); } @Test @@ -551,6 +505,9 @@ public void isolationSchedulerOnlyRelocatesLeftoverTopologyOntoNonIsolatedIdleSu assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"), "only the leftover regular topology is allowed to move onto the non-isolated idle supervisor"); + // The relocated leftover topology and the untouched isolated topology both keep their declared worker counts. + assertEquals(3, cluster.getAssignedNumWorkers(regular)); + assertEquals(1, cluster.getAssignedNumWorkers(isolated)); } /** @@ -598,6 +555,8 @@ public void reservedHostForDownIsolatedTopologyIsNotTreatedAsIdle() { "the leftover regular topology rebalances onto the genuinely idle, non-reserved supervisor"); assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-0")); assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(4, cluster.getAssignedNumWorkers(regular), + "the leftover topology keeps all 4 workers; the move never loses executors"); assertEquals(0, cluster.getUsedSlotsByTopologyId(isolated.getId()).size(), "the isolated topology is down and is never scheduled by the leftover path"); }