diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java index 4788aff385..7b0a446e81 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java @@ -15,10 +15,11 @@ */ package io.javaoperatorsdk.operator; -import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -36,9 +37,39 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration; +/** + * Manages the leader-election lifecycle for an {@link Operator} instance. Leader election ensures + * that, in a high-availability setup with multiple replicas of the same operator, only one replica + * at a time actively reconciles resources. The replica currently holding the lease is referred to + * as the leader, and the others stand by until the lease becomes available. + * + *

Leader election is opt-in. It is enabled when a {@link LeaderElectionConfiguration} is + * supplied via {@link + * io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider#withLeaderElectionConfiguration(LeaderElectionConfiguration) + * ConfigurationServiceOverrider#withLeaderElectionConfiguration(LeaderElectionConfiguration)}. The + * configuration controls the lease name, namespace, durations, and optional user-supplied {@link + * LeaderCallbacks}. + * + *

{@link #stopLeading()} behaves differently depending on how it was triggered: + * + *

+ * + *

The lifecycle methods {@link #start()} and {@link #stop()} are called by {@link Operator} as + * part of {@link Operator#start()} and {@link Operator#stop()} respectively. Users typically do not + * interact with this class directly. + */ public class LeaderElectionManager { private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class); + private static final List REQUIRED_VERBS = List.of("create", "update", "get"); public static final String NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE = "No permission to lease resource."; @@ -53,6 +84,10 @@ public class LeaderElectionManager { private final ConfigurationService configurationService; private String leaseNamespace; private String leaseName; + // Set in stop() before cancelling the leader-election future. Checked in stopLeading() so that + // a graceful shutdown does not call System.exit, which would otherwise deadlock against the + // JVM shutdown hook lock when stop() is invoked from a JVM shutdown hook. + private final AtomicBoolean stoppingGracefully = new AtomicBoolean(false); LeaderElectionManager( ControllerManager controllerManager, ConfigurationService configurationService) { @@ -118,7 +153,11 @@ private void startLeading() { controllerManager.startEventProcessing(); } - private void stopLeading() { + void stopLeading() { + if (stoppingGracefully.get()) { + log.info("Stopped leading for identity: {} during graceful shutdown.", identity); + return; + } if (configurationService.getLeaderElectionConfiguration().orElseThrow().isExitOnStopLeading()) { log.info("Stopped leading for identity: {}. Exiting.", identity); // When leader stops leading the process ends immediately to prevent multiple reconciliations @@ -147,13 +186,13 @@ public void start() { } public void stop() { + stoppingGracefully.set(true); if (leaderElectionFuture != null) { leaderElectionFuture.cancel(false); } } private void checkLeaseAccess() { - var verbsRequired = Arrays.asList("create", "update", "get"); SelfSubjectRulesReview review = new SelfSubjectRulesReview(); review.setSpec(new SelfSubjectRulesReviewSpecBuilder().withNamespace(leaseNamespace).build()); var reviewResult = configurationService.getKubernetesClient().resource(review).create(); @@ -168,16 +207,15 @@ private void checkLeaseAccess() { || rule.getResourceNames().contains(leaseName)) .map(ResourceRule::getVerbs) .flatMap(Collection::stream) - .distinct() - .collect(Collectors.toList()); - if (verbsAllowed.contains(UNIVERSAL_VALUE) || verbsAllowed.containsAll(verbsRequired)) { + .collect(Collectors.toUnmodifiableSet()); + if (verbsAllowed.contains(UNIVERSAL_VALUE) || verbsAllowed.containsAll(REQUIRED_VERBS)) { return; } var missingVerbs = - verbsRequired.stream() + REQUIRED_VERBS.stream() .filter(Predicate.not(verbsAllowed::contains)) - .collect(Collectors.toList()); + .collect(Collectors.joining(",")); throw new OperatorException( NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index fa8b4620bf..1e9f927f79 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.slf4j.Logger; @@ -43,6 +44,7 @@ public class Operator implements LifecycleAware { private LeaderElectionManager leaderElectionManager; private ConfigurationService configurationService; private volatile boolean started = false; + private final AtomicBoolean shutdownHookInstalled = new AtomicBoolean(false); public Operator() { init(initConfigurationService(null, null), true); @@ -129,6 +131,32 @@ protected ConfigurationService initConfigurationService( return ConfigurationService.newOverriddenConfigurationService(overrider); } + /** + * Adds a JVM shutdown hook that automatically calls {@link #stop()} when the application shuts + * down. The shutdown timeout used while waiting for in-flight reconciliations to complete is + * taken from {@link + * io.javaoperatorsdk.operator.api.config.ConfigurationService#reconciliationTerminationTimeout()}. + * Configure it via {@link + * io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider#withReconciliationTerminationTimeout(Duration) + * ConfigurationServiceOverrider#withReconciliationTerminationTimeout(Duration)}. + * + *

The hook is registered regardless of whether leader election is enabled. A leader pod + * receiving {@code SIGTERM} will therefore release its lease cleanly so that a standby replica + * can take over without waiting for lease expiry. + * + *

NOTE: You may also want to tune the Pod's {@code terminationGracePeriodSeconds} to be + * at least as long as the configured {@code reconciliationTerminationTimeout}, plus a small + * buffer for the rest of the shutdown sequence (releasing the leader-election lease and closing + * the Kubernetes client). If the grace period elapses before {@link #stop()} returns, the kubelet + * sends {@code SIGKILL}, in-flight reconciliations are abandoned, and any held leader-election + * lease is not released cleanly. + */ + public void installShutdownHook() { + if (shutdownHookInstalled.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); + } + } + /** * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note * that graceful shutdown is usually not needed, but some {@link Reconciler} implementations might @@ -137,16 +165,14 @@ protected ConfigurationService initConfigurationService( *

Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the * controller. * - * @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual - * reconciliations + * @param gracefulShutdownTimeout ignored, configure {@link + * ConfigurationService#reconciliationTerminationTimeout()} instead + * @deprecated Use {@link #installShutdownHook()} instead */ + @Deprecated(forRemoval = true) @SuppressWarnings("unused") public void installShutdownHook(Duration gracefulShutdownTimeout) { - if (!leaderElectionManager.isLeaderElectionEnabled()) { - Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); - } else { - log.warn("Leader election is on, shutdown hook will not be installed."); - } + installShutdownHook(); } public KubernetesClient getKubernetesClient() { @@ -188,6 +214,30 @@ public synchronized void start() { } } + /** + * Stops the operator and releases its resources. The shutdown sequence is: + * + *

    + *
  1. Stop the controller manager, halting reconciliation of all registered controllers. + *
  2. Stop the executor service manager, waiting up to {@link + * io.javaoperatorsdk.operator.api.config.ConfigurationService#reconciliationTerminationTimeout()} + * for in-flight reconciliations to complete. + *
  3. Stop the leader-election manager, cancelling the leader-election future and releasing any + * held lease. + *
  4. Close the {@link KubernetesClient} if {@link + * io.javaoperatorsdk.operator.api.config.ConfigurationService#closeClientOnStop()} is + * {@code true} (the default). + *
+ * + *

It is safe to call this method from a JVM shutdown hook (see {@link #installShutdownHook()}) + * as the graceful-shutdown path coordinates with the leader-election callbacks so that {@code + * System.exit} is not invoked while the JVM is already shutting down. + * + *

If the operator was never successfully started, this method only stops the executor service + * manager so that no thread pools are leaked. + * + * @throws OperatorException if an error occurs during shutdown + */ @Override public void stop() throws OperatorException { Duration reconciliationTerminationTimeout = diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/LeaderElectionManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/LeaderElectionManagerTest.java index 510890e56e..a885d7604c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/LeaderElectionManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/LeaderElectionManagerTest.java @@ -109,6 +109,18 @@ void testInitPermissionsMultipleRulesWithResourceName(@TempDir Path tempDir) thr assertTrue(leaderElectionManager.isLeaderElectionEnabled()); } + @Test + void stopLeadingDoesNotInvokeSystemExitWhenStopWasCalledFirst() { + // When stop() is called before the onStopLeading callback fires (which is what happens when + // stop()'s future cancellation triggers the callback), stopLeading() must skip + // System.exit(1). Otherwise calling stop() from inside a JVM shutdown hook deadlocks against + // the java.lang.Shutdown class lock. If this regression is ever reintroduced, this test + // method would terminate the JUnit JVM via System.exit(1) instead of failing cleanly. + final var leaderElectionManager = leaderElectionManager(null); + leaderElectionManager.stop(); + leaderElectionManager.stopLeading(); + } + @Test void testFailedToInitMissingPermission(@TempDir Path tempDir) throws IOException { var namespace = "foo";