From 8c0d69a534d6923ad35084825f7923d65c27c224 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Tue, 5 May 2026 08:06:55 -0700 Subject: [PATCH] test(amber): warm up ReconfigurationIntegrationSpec before timed assertions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first test in this suite intermittently times out on the 5-second `Await.result(client.controllerInterface.startWorkflow(...))` while the next test in the same JVM passes with the same budget — the cost is JVM-side first-touch (JIT, pekko dispatcher, RegionExecutionCoordinator class loading), not anything the test itself controls. Run a trivial pure-Scala TextInput → terminal workflow once in `beforeAll` so the JIT/class-loading bill is paid before any timed assertion. No Python is involved, so the warmup itself doesn't recreate the Python-worker cold-start cost it's trying to absorb on the JVM side. Hard cap the warmup at 10 seconds and wrap it in try/catch/finally: - Both Awaits use the same 10s ceiling, so warmup can never hang the suite. - Any exception is logged and swallowed; the existing `Retries` mixin still backs up individual test cases. - `client.shutdown()` and `cleanupWorkflowExecutionData()` always run. Refs #4946. --- .../e2e/ReconfigurationIntegrationSpec.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala index 768caf079f4..acadb9154c8 100644 --- a/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala +++ b/amber/src/test/integration/org/apache/texera/amber/engine/e2e/ReconfigurationIntegrationSpec.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.engine.e2e +import com.twitter.util.{Await, Duration, Promise, Return} import com.typesafe.scalalogging.Logger import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} @@ -28,12 +29,21 @@ import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode} import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.e2e.TestUtils.{ + buildWorkflow, cleanupWorkflowExecutionData, initiateTexeraDBForTestCases, setUpWorkflowExecutionData } +import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc import org.apache.texera.amber.operator.{LogicalOp, TestOperators} import org.apache.texera.amber.tags.IntegrationTest import org.apache.texera.workflow.LogicalLink @@ -86,12 +96,64 @@ class ReconfigurationIntegrationSpec // Explicitly load the JDBC driver to avoid flaky CI failures. Class.forName("org.postgresql.Driver") initiateTexeraDBForTestCases() + warmupOnce() } override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } + /** + * Run a trivial pure-Scala workflow (TextInput → terminal) once before the + * timed tests start, so the first 5-second `startWorkflow` await in + * [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT + * warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator` + * class loading. + * + * Hard-capped at 10 seconds total, defensively wrapped: if warmup itself + * times out or throws, log and continue — the existing `Retries` mixin + * still backs up individual test cases. This ensures warmup can never + * hang the suite. + */ + private def warmupOnce(): Unit = { + val warmupCap = Duration.fromSeconds(10) + setUpWorkflowExecutionData() + var client: AmberClient = null + try { + val src = new TextInputSourceOpDesc() + src.textInput = "warmup" + val warmupCtx = new WorkflowContext() + val workflow = buildWorkflow(List(src), List.empty, warmupCtx) + client = new AmberClient( + system, + workflow.context, + workflow.physicalPlan, + ControllerConfig.default, + _ => {} + ) + val completion = Promise[Unit]() + client.registerCallback[ExecutionStateUpdate](evt => { + if (evt.state == COMPLETED) completion.updateIfEmpty(Return(())) + }) + Await.result( + client.controllerInterface.startWorkflow(EmptyRequest(), ()), + warmupCap + ) + Await.result(completion, warmupCap) + } catch { + case e: Throwable => + logger.warn( + s"warmup workflow did not finish within ${warmupCap}; tests will run cold and rely on Retries: ${e.getMessage}" + ) + } finally { + if (client != null) { + try client.shutdown() + catch { case _: Throwable => () } + } + cleanupWorkflowExecutionData() + } + } + // Thin wrapper around the shared TestUtils helper so call sites below stay // ctx/system-implicit. The actual workflow-driver logic lives in TestUtils // and is reused by ReconfigurationSpec.