Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,46 @@ public static synchronized CassandraOptions instance() {
positiveInt(),
12 * 60 * 60
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_BASE_DELAY =
new ConfigOption<>(
"cassandra.reconnect_base_delay",
"The base delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(100L, Long.MAX_VALUE),
1000L
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_MAX_DELAY =
new ConfigOption<>(
"cassandra.reconnect_max_delay",
"The maximum delay in milliseconds used by the driver's " +
"exponential reconnection policy when a Cassandra host " +
"becomes unreachable.",
rangeInt(1000L, Long.MAX_VALUE),
10_000L
);

public static final ConfigOption<Integer> CASSANDRA_RECONNECT_MAX_RETRIES =
new ConfigOption<>(
"cassandra.reconnect_max_retries",
"The maximum number of retries applied at query-time when " +
"a Cassandra host is temporarily unreachable " +
"(NoHostAvailableException / OperationTimedOutException). " +
"Set to 0 to disable query-time retries.",
rangeInt(0, Integer.MAX_VALUE),
3
);

public static final ConfigOption<Long> CASSANDRA_RECONNECT_INTERVAL =
new ConfigOption<>(
"cassandra.reconnect_interval",
"The interval in milliseconds between query-time retries " +
"when a Cassandra host is temporarily unreachable. The " +
"actual wait grows with exponential backoff, capped at " +
"cassandra.reconnect_max_delay.",
rangeInt(100L, Long.MAX_VALUE),
1000L
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.BackendSession.AbstractBackendSession;
import org.apache.hugegraph.backend.store.BackendSessionPool;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
Expand All @@ -34,22 +37,67 @@
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.OperationTimedOutException;
import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;

public class CassandraSessionPool extends BackendSessionPool {

private static final Logger LOG = Log.logger(CassandraSessionPool.class);

private static final int SECOND = 1000;
private static final String HEALTH_CHECK_CQL =
"SELECT now() FROM system.local";

/**
* Guards the one-time JVM-wide warning about {@code commitAsync()} not
* being covered by query-time retries. {@link CassandraSessionPool} is
* instantiated once per backend store per graph, so without this guard
* the warning would fire many times on startup for a structural
* limitation that does not change between instances.
*/
private static final AtomicBoolean ASYNC_RETRY_WARNING_LOGGED =
new AtomicBoolean(false);

private Cluster cluster;
private final String keyspace;
private final int maxRetries;
private final long retryInterval;
private final long retryBaseDelay;
private final long retryMaxDelay;

public CassandraSessionPool(HugeConfig config,
String keyspace, String store) {
super(config, keyspace + "/" + store);
this.cluster = null;
this.keyspace = keyspace;
this.maxRetries = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_RETRIES);
this.retryInterval = config.get(
CassandraOptions.CASSANDRA_RECONNECT_INTERVAL);
long reconnectBase = config.get(
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY);
long reconnectMax = config.get(
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY);
E.checkArgument(reconnectMax >= reconnectBase,
"'%s' (%s) must be >= '%s' (%s)",
CassandraOptions.CASSANDRA_RECONNECT_MAX_DELAY.name(),
reconnectMax,
CassandraOptions.CASSANDRA_RECONNECT_BASE_DELAY.name(),
reconnectBase);
this.retryBaseDelay = reconnectBase;
this.retryMaxDelay = reconnectMax;

if (this.maxRetries > 0 &&
ASYNC_RETRY_WARNING_LOGGED.compareAndSet(false, true)) {
LOG.warn("cassandra.reconnect_max_retries={} applies to sync commit()" +
" only. commitAsync() has no retry protection.", this.maxRetries);
}
}

@Override
Expand Down Expand Up @@ -86,6 +134,12 @@ public synchronized void open() {

builder.withSocketOptions(socketOptions);

// Reconnection policy: let driver keep retrying nodes in background
// with exponential backoff after they go down (see issue #2740).
builder.withReconnectionPolicy(
new ExponentialReconnectionPolicy(this.retryBaseDelay,
this.retryMaxDelay));

// Credential options
String username = config.get(CassandraOptions.CASSANDRA_USERNAME);
String password = config.get(CassandraOptions.CASSANDRA_PASSWORD);
Expand Down Expand Up @@ -161,7 +215,7 @@ public void rollback() {

@Override
public ResultSet commit() {
ResultSet rs = this.session.execute(this.batch);
ResultSet rs = this.executeWithRetry(this.batch);
// Clear batch if execute() successfully (retained if failed)
this.batch.clear();
return rs;
Expand All @@ -174,6 +228,11 @@ public void commitAsync() {
int processors = Math.min(statements.size(), 1023);
List<ResultSetFuture> results = new ArrayList<>(processors + 1);
for (Statement s : statements) {
// TODO(issue #2740): commitAsync() bypasses executeWithRetry().
// During a Cassandra restart, async writes may fail with
// NoHostAvailableException even when maxRetries > 0. Callers
// must handle CompletableFuture failures. A follow-up will
// wrap each future with retry semantics.
ResultSetFuture future = this.session.executeAsync(s);
results.add(future);

Expand All @@ -197,15 +256,92 @@ public ResultSet query(Statement statement) {
}

public ResultSet execute(Statement statement) {
return this.session.execute(statement);
return this.executeWithRetry(statement);
}

public ResultSet execute(String statement) {
return this.session.execute(statement);
return this.executeWithRetry(new SimpleStatement(statement));
}

public ResultSet execute(String statement, Object... args) {
return this.session.execute(statement, args);
return this.executeWithRetry(new SimpleStatement(statement, args));
}

/**
Comment thread
dpol1 marked this conversation as resolved.
* Execute a statement, retrying on transient connectivity failures
* (NoHostAvailableException / OperationTimedOutException). The driver
* itself keeps retrying connections in the background via the
* reconnection policy, so once Cassandra comes back online, a
* subsequent attempt here will succeed without restarting the server.
*
* <p>If the driver session has been discarded (e.g. by
* {@link #reconnectIfNeeded()} after a failed health-check) it is
* lazily reopened at the start of each attempt. After a transient
* failure the session is {@linkplain #reset() reset} so the next
* iteration gets a fresh driver session.
*
* <p><b>Blocking note:</b> retries block the calling thread via
* {@link Thread#sleep(long)}. Worst-case a single call blocks for
* {@code maxRetries * retryMaxDelay} ms. Under high-throughput
* workloads concurrent threads may pile up in {@code sleep()} during
* a Cassandra outage. For such deployments lower
* {@code cassandra.reconnect_max_retries} (default 3) and
* {@code cassandra.reconnect_max_delay} (default 10000ms) so the
* request fails fast and pressure is released back to the caller.
*/
private ResultSet executeWithRetry(Statement statement) {
Comment thread
dpol1 marked this conversation as resolved.
int retries = CassandraSessionPool.this.maxRetries;
Comment thread
dpol1 marked this conversation as resolved.
long interval = CassandraSessionPool.this.retryInterval;
long maxDelay = CassandraSessionPool.this.retryMaxDelay;
DriverException lastError = null;
for (int attempt = 0; attempt <= retries; attempt++) {
try {
if (this.session == null) {
// Lazy reopen: may itself throw NHAE while
// Cassandra is still unreachable; the catch below
// treats that as a transient failure.
this.open();
}
return this.session.execute(statement);
} catch (NoHostAvailableException | OperationTimedOutException e) {
lastError = e;
// Discard the (possibly broken) driver session so the
// next iteration reopens cleanly.
this.reset();
if (attempt >= retries) {
break;
}
long cap = maxDelay > 0 ? maxDelay : interval;
long shift = 1L << Math.min(attempt, 20);
long delay;
try {
// Guard against Long overflow when retryInterval is huge.
delay = Math.min(Math.multiplyExact(interval, shift), cap);
} catch (ArithmeticException overflow) {
delay = cap;
}
LOG.warn("Cassandra temporarily unavailable ({}), " +
"retry {}/{} in {} ms",
e.getClass().getSimpleName(), attempt + 1,
retries, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new BackendException("Interrupted while " +
"waiting to retry " +
"Cassandra query", ie);
}
}
}
// Preserve original exception as cause (stack trace + type) by
// pre-formatting the message and using the (String, Throwable)
// constructor explicitly — avoids ambiguity with varargs overloads.
String msg = String.format(
"Failed to execute Cassandra query after %s retries: %s",
retries,
lastError == null ? "<null>" : lastError.getMessage());
throw new BackendException(msg, lastError);
}

private void tryOpen() {
Expand Down Expand Up @@ -255,6 +391,56 @@ public boolean hasChanges() {
return this.batch.size() > 0;
}

/**
* Periodic liveness probe invoked by {@link BackendSessionPool} to
* recover thread-local sessions after Cassandra has been restarted.
* Reopens the driver session if it was closed and pings the cluster
* with a lightweight query. On failure the session is discarded via
* {@link #reset()} so the next call to
* {@link #executeWithRetry(Statement)} reopens it; any exception
* here is swallowed so the caller can still issue the real query.
*/
Comment thread
dpol1 marked this conversation as resolved.
@Override
public void reconnectIfNeeded() {
if (!this.opened) {
return;
}
try {
if (this.session == null || this.session.isClosed()) {
this.session = null;
this.tryOpen();
}
if (this.session != null) {
this.session.execute(new SimpleStatement(HEALTH_CHECK_CQL));
Comment thread
dpol1 marked this conversation as resolved.
}
} catch (DriverException e) {
LOG.debug("Cassandra health-check failed, resetting session: {}",
e.getMessage());
this.session = null;
}
}

/**
* Force-close the driver session so it is re-opened on the next
* {@link #opened()} call. Used when a failure is observed and we
* want to start fresh on the next attempt.
*/
@Override
public void reset() {
if (this.session == null) {
return;
}
try {
this.session.close();
} catch (Exception e) {
// Do not swallow Error (OOM / StackOverflow); only log
// ordinary exceptions raised by the driver on close.
LOG.warn("Failed to reset Cassandra session", e);
} finally {
this.session = null;
}
}

public Collection<Statement> statements() {
return this.batch.getStatements();
}
Expand Down
Loading