diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 768caf079f4..acadb9154c8 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.engine.e2e +import com.twitter.util.{Await, Duration, Promise, Return} import com.typesafe.scalalogging.Logger import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} @@ -28,12 +29,21 @@ import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode} import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.e2e.TestUtils.{ + buildWorkflow, cleanupWorkflowExecutionData, initiateTexeraDBForTestCases, setUpWorkflowExecutionData } +import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc import org.apache.texera.amber.operator.{LogicalOp, TestOperators} import org.apache.texera.amber.tags.IntegrationTest import org.apache.texera.workflow.LogicalLink @@ -86,12 +96,64 @@ class ReconfigurationIntegrationSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") initiateTexeraDBForTestCases() + warmupOnce() } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } + /** + * Run a trivial pure-Scala workflow (TextInput → terminal) once before the + * timed tests start, so the first 5-second `startWorkflow` await in + * [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT + * warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator` + * class loading. + * + * Hard-capped at 10 seconds total, defensively wrapped: if warmup itself + * times out or throws, log and continue — the existing `Retries` mixin + * still backs up individual test cases. This ensures warmup can never + * hang the suite. + */ + private def warmupOnce(): Unit = { + val warmupCap = Duration.fromSeconds(10) + setUpWorkflowExecutionData() + var client: AmberClient = null + try { + val src = new TextInputSourceOpDesc() + src.textInput = "warmup" + val warmupCtx = new WorkflowContext() + val workflow = buildWorkflow(List(src), List.empty, warmupCtx) + client = new AmberClient( + system, + workflow.context, + workflow.physicalPlan, + ControllerConfig.default, + _ => {} + ) + val completion = Promise[Unit]() + client.registerCallback[ExecutionStateUpdate](evt => { + if (evt.state == COMPLETED) completion.updateIfEmpty(Return(())) + }) + Await.result( + client.controllerInterface.startWorkflow(EmptyRequest(), ()), + warmupCap + ) + Await.result(completion, warmupCap) + } catch { + case e: Throwable => + logger.warn( + s"warmup workflow did not finish within ${warmupCap}; tests will run cold and rely on Retries: ${e.getMessage}" + ) + } finally { + if (client != null) { + try client.shutdown() + catch { case _: Throwable => () } + } + cleanupWorkflowExecutionData() + } + } + // Thin wrapper around the shared TestUtils helper so call sites below stay // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils // and is reused by ReconfigurationSpec.