diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java index a9ccf97765..1c1aabde51 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraOptions.java @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() { positiveInt(), 12 * 60 * 60 ); + + public static final ConfigOption CASSANDRA_RECONNECT_BASE_DELAY = + new ConfigOption<>( + "cassandra.reconnect_base_delay", + "The base delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_DELAY = + new ConfigOption<>( + "cassandra.reconnect_max_delay", + "The maximum delay in milliseconds used by the driver's " + + "exponential reconnection policy when a Cassandra host " + + "becomes unreachable.", + rangeInt(1000L, Long.MAX_VALUE), + 10_000L + ); + + public static final ConfigOption CASSANDRA_RECONNECT_MAX_RETRIES = + new ConfigOption<>( + "cassandra.reconnect_max_retries", + "The maximum number of retries applied at query-time when " + + "a Cassandra host is temporarily unreachable " + + "(NoHostAvailableException / OperationTimedOutException). " + + "Set to 0 to disable query-time retries.", + rangeInt(0, Integer.MAX_VALUE), + 3 + ); + + public static final ConfigOption CASSANDRA_RECONNECT_INTERVAL = + new ConfigOption<>( + "cassandra.reconnect_interval", + "The interval in milliseconds between query-time retries " + + "when a Cassandra host is temporarily unreachable. The " + + "actual wait grows with exponential backoff, capped at " + + "cassandra.reconnect_max_delay.", + rangeInt(100L, Long.MAX_VALUE), + 1000L + ); } diff --git a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java index 7a9ffa2b91..9c7ee295ac 100644 --- a/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java +++ b/hugegraph-server/hugegraph-cassandra/src/main/java/org/apache/hugegraph/backend/store/cassandra/CassandraSessionPool.java @@ -20,12 +20,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession; import org.apache.hugegraph.backend.store.BackendSessionPool; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; @@ -34,22 +37,67 @@ import com.datastax.driver.core.ProtocolOptions.Compression; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.OperationTimedOutException; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; public class CassandraSessionPool extends BackendSessionPool { + private static final Logger LOG = Log.logger(CassandraSessionPool.class); + private static final int SECOND = 1000; + private static final String HEALTH_CHECK_CQL = + "SELECT now() FROM system.local"; + + /** + * Guards the one-time JVM-wide warning about {@code commitAsync()} not + * being covered by query-time retries. {@link CassandraSessionPool} is + * instantiated once per backend store per graph, so without this guard + * the warning would fire many times on startup for a structural + * limitation that does not change between instances. + */ + private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED = + new AtomicBoolean(false); private Cluster cluster; private final String keyspace; + private final int maxRetries; + private final long retryInterval; + private final long retryBaseDelay; + private final long retryMaxDelay; public CassandraSessionPool(HugeConfig config, String keyspace, String store) { super(config, keyspace + "/" + store); this.cluster = null; this.keyspace = keyspace; + this.maxRetries = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES); + this.retryInterval = config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL); + long reconnectBase = config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY); + long reconnectMax = config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY); + E.checkArgument(reconnectMax >= reconnectBase, + "'%s' (%s) must be >= '%s' (%s)", + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), + reconnectMax, + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + reconnectBase); + this.retryBaseDelay = reconnectBase; + this.retryMaxDelay = reconnectMax; + + if (this.maxRetries > 0 && + ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) { + LOG.warn("cassandra.reconnect_max_retries={} applies to sync commit()" + + " only. commitAsync() has no retry protection.", this.maxRetries); + } } @Override @@ -86,6 +134,12 @@ public synchronized void open() { builder.withSocketOptions(socketOptions); + // Reconnection policy: let driver keep retrying nodes in background + // with exponential backoff after they go down (see issue #2740). + builder.withReconnectionPolicy( + new ExponentialReconnectionPolicy(this.retryBaseDelay, + this.retryMaxDelay)); + // Credential options String username = config.get(CassandraOptions.CASSANDRA_USERNAME); String password = config.get(CassandraOptions.CASSANDRA_PASSWORD); @@ -161,7 +215,7 @@ public void rollback() { @Override public ResultSet commit() { - ResultSet rs = this.session.execute(this.batch); + ResultSet rs = this.executeWithRetry(this.batch); // Clear batch if execute() successfully (retained if failed) this.batch.clear(); return rs; @@ -174,6 +228,11 @@ public void commitAsync() { int processors = Math.min(statements.size(), 1023); List results = new ArrayList<>(processors + 1); for (Statement s : statements) { + // TODO(issue #2740): commitAsync() bypasses executeWithRetry(). + // During a Cassandra restart, async writes may fail with + // NoHostAvailableException even when maxRetries > 0. Callers + // must handle CompletableFuture failures. A follow-up will + // wrap each future with retry semantics. ResultSetFuture future = this.session.executeAsync(s); results.add(future); @@ -197,15 +256,92 @@ public ResultSet query(Statement statement) { } public ResultSet execute(Statement statement) { - return this.session.execute(statement); + return this.executeWithRetry(statement); } public ResultSet execute(String statement) { - return this.session.execute(statement); + return this.executeWithRetry(new SimpleStatement(statement)); } public ResultSet execute(String statement, Object... args) { - return this.session.execute(statement, args); + return this.executeWithRetry(new SimpleStatement(statement, args)); + } + + /** + * Execute a statement, retrying on transient connectivity failures + * (NoHostAvailableException / OperationTimedOutException). The driver + * itself keeps retrying connections in the background via the + * reconnection policy, so once Cassandra comes back online, a + * subsequent attempt here will succeed without restarting the server. + * + *

If the driver session has been discarded (e.g. by + * {@link #reconnectIfNeeded()} after a failed health-check) it is + * lazily reopened at the start of each attempt. After a transient + * failure the session is {@linkplain #reset() reset} so the next + * iteration gets a fresh driver session. + * + *

Blocking note: retries block the calling thread via + * {@link Thread#sleep(long)}. Worst-case a single call blocks for + * {@code maxRetries * retryMaxDelay} ms. Under high-throughput + * workloads concurrent threads may pile up in {@code sleep()} during + * a Cassandra outage. For such deployments lower + * {@code cassandra.reconnect_max_retries} (default 3) and + * {@code cassandra.reconnect_max_delay} (default 10000ms) so the + * request fails fast and pressure is released back to the caller. + */ + private ResultSet executeWithRetry(Statement statement) { + int retries = CassandraSessionPool.this.maxRetries; + long interval = CassandraSessionPool.this.retryInterval; + long maxDelay = CassandraSessionPool.this.retryMaxDelay; + DriverException lastError = null; + for (int attempt = 0; attempt <= retries; attempt++) { + try { + if (this.session == null) { + // Lazy reopen: may itself throw NHAE while + // Cassandra is still unreachable; the catch below + // treats that as a transient failure. + this.open(); + } + return this.session.execute(statement); + } catch (NoHostAvailableException | OperationTimedOutException e) { + lastError = e; + // Discard the (possibly broken) driver session so the + // next iteration reopens cleanly. + this.reset(); + if (attempt >= retries) { + break; + } + long cap = maxDelay > 0 ? maxDelay : interval; + long shift = 1L << Math.min(attempt, 20); + long delay; + try { + // Guard against Long overflow when retryInterval is huge. + delay = Math.min(Math.multiplyExact(interval, shift), cap); + } catch (ArithmeticException overflow) { + delay = cap; + } + LOG.warn("Cassandra temporarily unavailable ({}), " + + "retry {}/{} in {} ms", + e.getClass().getSimpleName(), attempt + 1, + retries, delay); + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BackendException("Interrupted while " + + "waiting to retry " + + "Cassandra query", ie); + } + } + } + // Preserve original exception as cause (stack trace + type) by + // pre-formatting the message and using the (String, Throwable) + // constructor explicitly — avoids ambiguity with varargs overloads. + String msg = String.format( + "Failed to execute Cassandra query after %s retries: %s", + retries, + lastError == null ? "" : lastError.getMessage()); + throw new BackendException(msg, lastError); } private void tryOpen() { @@ -255,6 +391,56 @@ public boolean hasChanges() { return this.batch.size() > 0; } + /** + * Periodic liveness probe invoked by {@link BackendSessionPool} to + * recover thread-local sessions after Cassandra has been restarted. + * Reopens the driver session if it was closed and pings the cluster + * with a lightweight query. On failure the session is discarded via + * {@link #reset()} so the next call to + * {@link #executeWithRetry(Statement)} reopens it; any exception + * here is swallowed so the caller can still issue the real query. + */ + @Override + public void reconnectIfNeeded() { + if (!this.opened) { + return; + } + try { + if (this.session == null || this.session.isClosed()) { + this.session = null; + this.tryOpen(); + } + if (this.session != null) { + this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL)); + } + } catch (DriverException e) { + LOG.debug("Cassandra health-check failed, resetting session: {}", + e.getMessage()); + this.session = null; + } + } + + /** + * Force-close the driver session so it is re-opened on the next + * {@link #opened()} call. Used when a failure is observed and we + * want to start fresh on the next attempt. + */ + @Override + public void reset() { + if (this.session == null) { + return; + } + try { + this.session.close(); + } catch (Exception e) { + // Do not swallow Error (OOM / StackOverflow); only log + // ordinary exceptions raised by the driver on close. + LOG.warn("Failed to reset Cassandra session", e); + } finally { + this.session = null; + } + } + public Collection statements() { return this.batch.getStatements(); } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java index ef5a8e896b..e9ec731b83 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cassandra/CassandraTest.java @@ -17,11 +17,13 @@ package org.apache.hugegraph.unit.cassandra; +import java.util.Collections; import java.util.Map; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.hugegraph.backend.store.cassandra.CassandraOptions; +import org.apache.hugegraph.backend.store.cassandra.CassandraSessionPool; import org.apache.hugegraph.backend.store.cassandra.CassandraStore; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.config.OptionSpace; @@ -30,7 +32,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -192,4 +198,139 @@ public void testParseReplicaWithNetworkTopologyStrategyAndDoubleReplica() { Whitebox.invokeStatic(CassandraStore.class, "parseReplica", config); }); } + + @Test + public void testReconnectOptionsHaveSensibleDefaults() { + // Runtime-reconnection options must exist with non-zero defaults so + // HugeGraph keeps running when Cassandra restarts (issue #2740). + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_BASE_DELAY.defaultValue()); + Assert.assertEquals(10_000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_MAX_DELAY.defaultValue()); + Assert.assertEquals(3, (int) CassandraOptions + .CASSANDRA_RECONNECT_MAX_RETRIES.defaultValue()); + Assert.assertEquals(1000L, (long) CassandraOptions + .CASSANDRA_RECONNECT_INTERVAL.defaultValue()); + } + + @Test + public void testReconnectOptionsAreOverridable() { + String base = CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(); + String max = CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(); + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + String interval = CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(); + + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(base, 500L); + conf.setProperty(max, 30_000L); + conf.setProperty(retries, 3); + conf.setProperty(interval, 1000L); + HugeConfig config = new HugeConfig(conf); + + Assert.assertEquals(500L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY)); + Assert.assertEquals(30_000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY)); + Assert.assertEquals(3, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + Assert.assertEquals(1000L, (long) config.get( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL)); + } + + @Test + public void testReconnectRetriesCanBeDisabled() { + String retries = CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES + .name(); + Configuration conf = new PropertiesConfiguration(); + conf.setProperty(retries, 0); + HugeConfig config = new HugeConfig(conf); + Assert.assertEquals(0, (int) config.get( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES)); + } + + @Test + public void testExecuteWithRetrySucceedsAfterTransientFailures() { + // Configure retry knobs via config so the pool reads them through + // the normal path (no Whitebox overrides on retry fields). Keep the + // values within the validators' lower bounds (base >= 100, max >= + // base, interval >= 100). + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 100L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 1000L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(), 100L); + HugeConfig config = new HugeConfig(conf); + CassandraSessionPool pool = new CassandraSessionPool(config, + "ks", "store"); + + com.datastax.driver.core.Session driverSession = Mockito.mock( + com.datastax.driver.core.Session.class); + ResultSet rs = Mockito.mock(ResultSet.class); + NoHostAvailableException transientFailure = + new NoHostAvailableException(Collections.emptyMap()); + Mockito.when(driverSession.execute(Mockito.any(Statement.class))) + .thenThrow(transientFailure) + .thenThrow(transientFailure) + .thenReturn(rs); + + // executeWithRetry now resets the driver session on transient + // failures, so the next iteration calls cluster().connect(keyspace) + // to obtain a fresh one. Install a mocked Cluster that hands back + // the same driverSession for each reconnect. + com.datastax.driver.core.Cluster mockCluster = Mockito.mock( + com.datastax.driver.core.Cluster.class); + Mockito.when(mockCluster.isClosed()).thenReturn(false); + Mockito.when(mockCluster.connect(Mockito.anyString())) + .thenReturn(driverSession); + Whitebox.setInternalState(pool, "cluster", mockCluster); + + CassandraSessionPool.Session session = pool.new Session(); + Whitebox.setInternalState(session, "session", driverSession); + + ResultSet result = session.execute("SELECT now() FROM system.local"); + Assert.assertSame(rs, result); + Mockito.verify(driverSession, Mockito.times(3)) + .execute(Mockito.any(Statement.class)); + } + + @Test + public void testReconnectBaseDelayBelowMinimumRejected() { + // The validator on CASSANDRA_RECONNECT_BASE_DELAY is + // rangeInt(100L, Long.MAX_VALUE); values below 100 must be rejected + // at parse time. Setting the property as a String forces HugeConfig + // to run parseConvert() which invokes the range check. + Configuration conf = new PropertiesConfiguration(); + Assert.assertThrows(Exception.class, () -> { + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), + "50"); + new HugeConfig(conf); + }); + } + + @Test + public void testReconnectMaxDelayLessThanBaseRejected() { + // Both values must pass their individual range validators with margin + // (base >= 100, max >= 1000), so the only thing that can throw is the + // E.checkArgument(max >= base) cross-check inside the pool ctor. Set + // all four reconnect properties explicitly so the test does not depend + // on default values changing in CassandraOptions. + Configuration conf = new PropertiesConfiguration(); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(), 10_000L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(), 2_000L); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES.name(), 3); + conf.setProperty( + CassandraOptions.CASSANDRA_RECONNECT_INTERVAL.name(), 1_000L); + HugeConfig config = new HugeConfig(conf); + Assert.assertThrows(IllegalArgumentException.class, () -> + new CassandraSessionPool(config, "ks", "store")); + } }