Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.texera.amber.engine.e2e

import com.twitter.util.{Await, Duration, Promise, Return}
import com.typesafe.scalalogging.Logger
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
Expand All @@ -28,12 +29,21 @@ import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.common.AmberRuntime
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
import org.apache.texera.amber.tags.IntegrationTest
import org.apache.texera.workflow.LogicalLink
Expand Down Expand Up @@ -86,12 +96,64 @@ class ReconfigurationIntegrationSpec
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
initiateTexeraDBForTestCases()
warmupOnce()
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

/**
* Run a trivial pure-Scala workflow (TextInput → terminal) once before the
* timed tests start, so the first 5-second `startWorkflow` await in
* [[TestUtils.shouldReconfigure]] doesn't have to absorb JVM JIT
* warmup, pekko dispatcher first-touch, and `RegionExecutionCoordinator`
* class loading.
*
* Hard-capped at 10 seconds total, defensively wrapped: if warmup itself
* times out or throws, log and continue — the existing `Retries` mixin
* still backs up individual test cases. This ensures warmup can never
* hang the suite.
*/
private def warmupOnce(): Unit = {
val warmupCap = Duration.fromSeconds(10)
setUpWorkflowExecutionData()
var client: AmberClient = null
try {
val src = new TextInputSourceOpDesc()
src.textInput = "warmup"
val warmupCtx = new WorkflowContext()
val workflow = buildWorkflow(List(src), List.empty, warmupCtx)
client = new AmberClient(
system,
workflow.context,
workflow.physicalPlan,
ControllerConfig.default,
_ => {}
)
val completion = Promise[Unit]()
client.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) completion.updateIfEmpty(Return(()))
})
Await.result(
client.controllerInterface.startWorkflow(EmptyRequest(), ()),
warmupCap
)
Await.result(completion, warmupCap)
} catch {
case e: Throwable =>
logger.warn(
s"warmup workflow did not finish within ${warmupCap}; tests will run cold and rely on Retries: ${e.getMessage}"
)
} finally {
if (client != null) {
try client.shutdown()
catch { case _: Throwable => () }
}
cleanupWorkflowExecutionData()
}
}

// Thin wrapper around the shared TestUtils helper so call sites below stay
// ctx/system-implicit. The actual workflow-driver logic lives in TestUtils
// and is reused by ReconfigurationSpec.
Expand Down
Loading