Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package io.javaoperatorsdk.operator;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -36,9 +37,39 @@
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;

/**
* Manages the leader-election lifecycle for an {@link Operator} instance. Leader election ensures
* that, in a high-availability setup with multiple replicas of the same operator, only one replica
* at a time actively reconciles resources. The replica currently holding the lease is referred to
* as the leader, and the others stand by until the lease becomes available.
*
* <p>Leader election is opt-in. It is enabled when a {@link LeaderElectionConfiguration} is
* supplied via {@link
* io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider#withLeaderElectionConfiguration(LeaderElectionConfiguration)
* ConfigurationServiceOverrider#withLeaderElectionConfiguration(LeaderElectionConfiguration)}. The
* configuration controls the lease name, namespace, durations, and optional user-supplied {@link
* LeaderCallbacks}.
*
* <p>{@link #stopLeading()} behaves differently depending on how it was triggered:
*
* <ul>
* <li>If {@link #stop()} has already been called (graceful shutdown), it logs and returns without
* exiting. This avoids deadlocking against the JVM shutdown hook lock when {@link
* Operator#stop()} is invoked from a JVM shutdown hook.
* <li>Otherwise, if the configured {@link LeaderElectionConfiguration#isExitOnStopLeading()} is
* {@code true} (the default), it calls {@code System.exit(1)} so the process restarts and
* another replica can take over.
* <li>If {@code isExitOnStopLeading()} is {@code false}, it only logs and returns.
* </ul>
*
* <p>The lifecycle methods {@link #start()} and {@link #stop()} are called by {@link Operator} as
* part of {@link Operator#start()} and {@link Operator#stop()} respectively. Users typically do not
* interact with this class directly.
*/
public class LeaderElectionManager {

private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class);
private static final List<String> REQUIRED_VERBS = List.of("create", "update", "get");

public static final String NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE =
"No permission to lease resource.";
Expand All @@ -53,6 +84,10 @@ public class LeaderElectionManager {
private final ConfigurationService configurationService;
private String leaseNamespace;
private String leaseName;
// Set in stop() before cancelling the leader-election future. Checked in stopLeading() so that
// a graceful shutdown does not call System.exit, which would otherwise deadlock against the
// JVM shutdown hook lock when stop() is invoked from a JVM shutdown hook.
private final AtomicBoolean stoppingGracefully = new AtomicBoolean(false);

LeaderElectionManager(
ControllerManager controllerManager, ConfigurationService configurationService) {
Expand Down Expand Up @@ -118,7 +153,11 @@ private void startLeading() {
controllerManager.startEventProcessing();
}

private void stopLeading() {
void stopLeading() {
if (stoppingGracefully.get()) {
log.info("Stopped leading for identity: {} during graceful shutdown.", identity);
return;
}
if (configurationService.getLeaderElectionConfiguration().orElseThrow().isExitOnStopLeading()) {
log.info("Stopped leading for identity: {}. Exiting.", identity);
// When leader stops leading the process ends immediately to prevent multiple reconciliations
Expand Down Expand Up @@ -147,13 +186,13 @@ public void start() {
}

public void stop() {
stoppingGracefully.set(true);
if (leaderElectionFuture != null) {
leaderElectionFuture.cancel(false);
}
}

private void checkLeaseAccess() {
var verbsRequired = Arrays.asList("create", "update", "get");
SelfSubjectRulesReview review = new SelfSubjectRulesReview();
review.setSpec(new SelfSubjectRulesReviewSpecBuilder().withNamespace(leaseNamespace).build());
var reviewResult = configurationService.getKubernetesClient().resource(review).create();
Expand All @@ -168,16 +207,15 @@ private void checkLeaseAccess() {
|| rule.getResourceNames().contains(leaseName))
.map(ResourceRule::getVerbs)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toList());
if (verbsAllowed.contains(UNIVERSAL_VALUE) || verbsAllowed.containsAll(verbsRequired)) {
.collect(Collectors.toUnmodifiableSet());
if (verbsAllowed.contains(UNIVERSAL_VALUE) || verbsAllowed.containsAll(REQUIRED_VERBS)) {
return;
}

var missingVerbs =
verbsRequired.stream()
REQUIRED_VERBS.stream()
.filter(Predicate.not(verbsAllowed::contains))
.collect(Collectors.toList());
.collect(Collectors.joining(","));

throw new OperatorException(
NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class Operator implements LifecycleAware {
private LeaderElectionManager leaderElectionManager;
private ConfigurationService configurationService;
private volatile boolean started = false;
private final AtomicBoolean shutdownHookInstalled = new AtomicBoolean(false);

public Operator() {
init(initConfigurationService(null, null), true);
Expand Down Expand Up @@ -129,6 +131,32 @@ protected ConfigurationService initConfigurationService(
return ConfigurationService.newOverriddenConfigurationService(overrider);
}

/**
* Adds a JVM shutdown hook that automatically calls {@link #stop()} when the application shuts
* down. The shutdown timeout used while waiting for in-flight reconciliations to complete is
* taken from {@link
* io.javaoperatorsdk.operator.api.config.ConfigurationService#reconciliationTerminationTimeout()}.
* Configure it via {@link
* io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider#withReconciliationTerminationTimeout(Duration)
* ConfigurationServiceOverrider#withReconciliationTerminationTimeout(Duration)}.
*
* <p>The hook is registered regardless of whether leader election is enabled. A leader pod
* receiving {@code SIGTERM} will therefore release its lease cleanly so that a standby replica
* can take over without waiting for lease expiry.
*
* <p><b>NOTE:</b> You may also want to tune the Pod's {@code terminationGracePeriodSeconds} to be
* at least as long as the configured {@code reconciliationTerminationTimeout}, plus a small
* buffer for the rest of the shutdown sequence (releasing the leader-election lease and closing
* the Kubernetes client). If the grace period elapses before {@link #stop()} returns, the kubelet
* sends {@code SIGKILL}, in-flight reconciliations are abandoned, and any held leader-election
* lease is not released cleanly.
*/
public void installShutdownHook() {
if (shutdownHookInstalled.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
}

/**
* Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note
* that graceful shutdown is usually not needed, but some {@link Reconciler} implementations might
Expand All @@ -137,16 +165,14 @@ protected ConfigurationService initConfigurationService(
* <p>Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the
* controller.
*
* @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual
* reconciliations
* @param gracefulShutdownTimeout ignored, configure {@link
* ConfigurationService#reconciliationTerminationTimeout()} instead
* @deprecated Use {@link #installShutdownHook()} instead
*/
@Deprecated(forRemoval = true)
@SuppressWarnings("unused")
public void installShutdownHook(Duration gracefulShutdownTimeout) {
if (!leaderElectionManager.isLeaderElectionEnabled()) {
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} else {
log.warn("Leader election is on, shutdown hook will not be installed.");
}
installShutdownHook();
}

public KubernetesClient getKubernetesClient() {
Expand Down Expand Up @@ -188,6 +214,30 @@ public synchronized void start() {
}
}

/**
* Stops the operator and releases its resources. The shutdown sequence is:
*
* <ol>
* <li>Stop the controller manager, halting reconciliation of all registered controllers.
* <li>Stop the executor service manager, waiting up to {@link
* io.javaoperatorsdk.operator.api.config.ConfigurationService#reconciliationTerminationTimeout()}
* for in-flight reconciliations to complete.
* <li>Stop the leader-election manager, cancelling the leader-election future and releasing any
* held lease.
* <li>Close the {@link KubernetesClient} if {@link
* io.javaoperatorsdk.operator.api.config.ConfigurationService#closeClientOnStop()} is
* {@code true} (the default).
* </ol>
*
* <p>It is safe to call this method from a JVM shutdown hook (see {@link #installShutdownHook()})
* as the graceful-shutdown path coordinates with the leader-election callbacks so that {@code
* System.exit} is not invoked while the JVM is already shutting down.
*
* <p>If the operator was never successfully started, this method only stops the executor service
* manager so that no thread pools are leaked.
*
* @throws OperatorException if an error occurs during shutdown
*/
@Override
public void stop() throws OperatorException {
Duration reconciliationTerminationTimeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ void testInitPermissionsMultipleRulesWithResourceName(@TempDir Path tempDir) thr
assertTrue(leaderElectionManager.isLeaderElectionEnabled());
}

@Test
void stopLeadingDoesNotInvokeSystemExitWhenStopWasCalledFirst() {
// When stop() is called before the onStopLeading callback fires (which is what happens when
// stop()'s future cancellation triggers the callback), stopLeading() must skip
// System.exit(1). Otherwise calling stop() from inside a JVM shutdown hook deadlocks against
// the java.lang.Shutdown class lock. If this regression is ever reintroduced, this test
// method would terminate the JUnit JVM via System.exit(1) instead of failing cleanly.
final var leaderElectionManager = leaderElectionManager(null);
leaderElectionManager.stop();
leaderElectionManager.stopLeading();
}
Comment on lines +112 to +122

@Test
void testFailedToInitMissingPermission(@TempDir Path tempDir) throws IOException {
var namespace = "foo";
Expand Down
Loading