From 412f2c8ab165851a7a4a9c02b962f8ac65b83b0b Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Tue, 26 May 2026 12:56:19 +0500 Subject: [PATCH 1/3] Add Impulse translator and URN-dispatch framework MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KafkaStreamsPipelineTranslator now walks the pipeline in topological order via QueryablePipeline and dispatches each transform to a PTransformTranslator keyed by URN. Unknown URNs still fail fast with a clear "No translator registered for URN ..." message. - ImpulseTranslator implements beam:transform:impulse:v1 per design doc §4.1: a per-application bootstrap topic source (__beam_impulse_) satisfies Kafka Streams' real-source requirement, ImpulseProcessor emits exactly one WindowedValue in the GlobalWindow via a one-shot wall-clock punctuator scheduled on init, and a persistent state store records a "fired" flag so task restarts do not duplicate. Bootstrap-topic auto-creation is deferred to a follow-up sub-issue (design doc §12.1); the topic is expected to pre-exist in production. - KafkaStreamsTranslationContext now holds the Topology being built and a PCollection-id -> processor-name map so downstream translators can wire to their parent nodes. - KafkaStreamsPipelineRunner.run now translates and starts the KafkaStreams application, returning a KafkaStreamsPortablePipelineResult that maps KafkaStreams.State to Beam's PipelineResult.State. Forces processing.guarantee=exactly_once_v2. - Tests: * KafkaStreamsPipelineTranslatorTest also covers the Impulse success path; the unsupported-URN check now uses GroupByKey. * ImpulseTranslatorTest exercises the topology via TopologyTestDriver: exactly one empty byte[] in GlobalWindow is emitted, and a second wall-clock advance does not re-emit. --- runners/kafka-streams/build.gradle | 3 +- .../streams/KafkaStreamsPipelineRunner.java | 33 ++++- .../KafkaStreamsPortablePipelineResult.java | 123 +++++++++++++++++ .../streams/translation/ImpulseProcessor.java | 108 +++++++++++++++ .../translation/ImpulseTranslator.java | 97 +++++++++++++ .../KafkaStreamsPipelineTranslator.java | 65 +++++---- .../KafkaStreamsTranslationContext.java | 62 ++++++++- .../translation/PTransformTranslator.java | 41 ++++++ .../translation/ImpulseTranslatorTest.java | 130 ++++++++++++++++++ .../KafkaStreamsPipelineTranslatorTest.java | 73 ++++++---- 10 files changed, 678 insertions(+), 57 deletions(-) create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/PTransformTranslator.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle index 9204fef4e768..3f34a3ca76b6 100644 --- a/runners/kafka-streams/build.gradle +++ b/runners/kafka-streams/build.gradle @@ -44,6 +44,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(path: ":model:job-management", configuration: "shadow") implementation project(":runners:core-java") permitUnusedDeclared project(":runners:core-java") implementation project(":runners:java-fn-execution") @@ -58,10 +59,10 @@ dependencies { implementation "org.apache.kafka:kafka-clients:$kafka_version" implementation "org.apache.kafka:kafka-streams:$kafka_version" permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" - permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core + testImplementation "org.apache.kafka:kafka-streams-test-utils:$kafka_version" } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java index 7c78d3a751b4..48a20fff1b98 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java @@ -17,16 +17,24 @@ */ package org.apache.beam.runners.kafka.streams; +import java.util.Properties; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** Executes a portable pipeline by translating it to Kafka Streams. */ +/** Executes a portable pipeline by translating it to a Kafka Streams {@link Topology}. */ public class KafkaStreamsPipelineRunner implements PortablePipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipelineRunner.class); + private final KafkaStreamsPipelineOptions pipelineOptions; public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { @@ -34,12 +42,31 @@ public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { } @Override - public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception { + public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, pipelineOptions); RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline); translator.translate(context, prepared); - throw new IllegalStateException("Translation unexpectedly completed without an executor"); + + Topology topology = context.getTopology(); + LOG.info( + "Translated pipeline {} into Kafka Streams topology:\n{}", + jobInfo.jobId(), + topology.describe()); + + KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig(jobInfo)); + kafkaStreams.start(); + return new KafkaStreamsPortablePipelineResult(kafkaStreams); + } + + private Properties streamsConfig(JobInfo jobInfo) { + Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, pipelineOptions.getBootstrapServers()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, pipelineOptions.getApplicationId()); + props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir()); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId()); + return props; } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java new file mode 100644 index 000000000000..3596d66789fc --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.kafka.streams.KafkaStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Result of executing a portable pipeline as a {@link KafkaStreams} application. + * + *

Translates the underlying {@link KafkaStreams.State} into Beam's {@link + * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / {@link + * #waitUntilFinish()} to the {@code KafkaStreams} instance. + */ +class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class); + + private final KafkaStreams kafkaStreams; + private final CountDownLatch terminated = new CountDownLatch(1); + + KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { + this.kafkaStreams = kafkaStreams; + kafkaStreams.setStateListener( + (newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { + terminated.countDown(); + } + }); + } + + @Override + public State getState() { + return mapState(kafkaStreams.state()); + } + + @Override + public State cancel() throws IOException { + kafkaStreams.close(); + terminated.countDown(); + return getState(); + } + + @Override + public State waitUntilFinish(Duration duration) { + try { + boolean reachedTerminal = terminated.await(duration.getMillis(), TimeUnit.MILLISECONDS); + if (!reachedTerminal) { + return getState(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return State.UNKNOWN; + } + return getState(); + } + + @Override + public State waitUntilFinish() { + try { + terminated.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return State.UNKNOWN; + } + return getState(); + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "Metrics are not yet implemented in the Kafka Streams runner."); + } + + @Override + public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException { + LOG.debug("portableMetrics() not yet implemented in the Kafka Streams runner"); + return JobApi.MetricResults.newBuilder().build(); + } + + private static State mapState(KafkaStreams.State state) { + switch (state) { + case CREATED: + case REBALANCING: + return State.RUNNING; + case RUNNING: + return State.RUNNING; + case PENDING_SHUTDOWN: + return State.CANCELLED; + case PENDING_ERROR: + case ERROR: + return State.FAILED; + case NOT_RUNNING: + return State.DONE; + default: + return State.UNKNOWN; + } + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java new file mode 100644 index 000000000000..394aae53f721 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.time.Duration; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform. + * + *

Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} payload in the {@link + * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp {@link + * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and is persisted in a + * state store keyed by the transform id so that task restarts do not re-emit. + * + *

The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the + * processor fire even when the dedicated bootstrap source topic is empty, which is the expected + * production state. + * + *

Watermark advancement to {@code TIMESTAMP_MAX_VALUE} (design doc §4.1) is intentionally + * not performed here. Kafka Streams has no native Beam watermark; the output PCollection's + * watermark moves through the (future) runner-side watermark manager rather than through the {@link + * Record} timestamp. The forwarded Kafka Streams record carries a non-negative record timestamp + * ({@code 0L}) because KS rejects negative record timestamps; the Beam event-time lives inside the + * {@link WindowedValue}. + */ +class ImpulseProcessor implements Processor> { + + private static final Logger LOG = LoggerFactory.getLogger(ImpulseProcessor.class); + + /** Sole entry in the state store; the value tracks whether this processor has already emitted. */ + static final String FIRED_KEY = "fired"; + + /** How soon after {@link #init} the punctuator first fires. */ + private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50); + + private final String stateStoreName; + private final String transformId; + + private @Nullable ProcessorContext> context; + private @Nullable KeyValueStore firedStore; + + ImpulseProcessor(String stateStoreName, String transformId) { + this.stateStoreName = stateStoreName; + this.transformId = transformId; + } + + @Override + public void init(ProcessorContext> context) { + this.context = context; + this.firedStore = context.getStateStore(stateStoreName); + context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); + } + + @Override + public void process(Record record) { + // Records that happen to land on the bootstrap topic are not actual data; they just provide an + // extra opportunity to fire the impulse on restart. The state store still gates the emit. + maybeFire(); + } + + private void maybeFire() { + ProcessorContext> ctx = context; + KeyValueStore store = firedStore; + if (ctx == null || store == null) { + return; + } + if (Boolean.TRUE.equals(store.get(FIRED_KEY))) { + return; + } + WindowedValue impulse = WindowedValues.valueInGlobalWindow(new byte[0]); + // The output PCollection is not keyed (PCollection); use an empty byte[] as a + // placeholder key so downstream processors that adopt the byte[]-key convention see a + // consistent shape. + // + // Kafka Streams disallows negative record timestamps, so the Record carries the Unix epoch + // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives inside the forwarded + // WindowedValue and is what downstream Beam logic must consult. + ctx.forward(new Record>(new byte[0], impulse, 0L)); + store.put(FIRED_KEY, Boolean.TRUE); + LOG.debug("Impulse {} emitted single element", transformId); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java new file mode 100644 index 000000000000..d73af6a7facc --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; + +/** + * Translates the {@code beam:transform:impulse:v1} URN. + * + *

Adds three nodes to the Kafka Streams {@link Topology}: + * + *

    + *
  • A {@code byte[]} source bound to a dedicated per-application bootstrap topic (see {@link + * KafkaStreamsTranslationContext#getImpulseBootstrapTopic()}). Kafka Streams refuses to start + * a topology that has no real source topic, so the bootstrap topic exists purely to satisfy + * that requirement — records published to it are ignored by {@link ImpulseProcessor}. + *
  • The {@link ImpulseProcessor} itself, which schedules a one-shot wall-clock punctuator on + * {@code init} and emits a single empty {@code WindowedValue} downstream. + *
  • A per-processor {@link KeyValueBytesStoreSupplier persistent state store} that records + * whether the impulse has already fired so task restarts do not duplicate it. + *
+ * + *

The processor's output PCollection is registered with the translation context so subsequent + * translators can wire themselves to this node by id. + * + *

Bootstrap topic lifecycle: this translator does not auto-create the bootstrap + * topic. The topic is expected to exist on the broker before the job starts; otherwise Kafka + * Streams raises {@code MissingSourceTopicException} on startup. The auto-create-vs-pre-create + * decision (design doc §12.1) is deferred to a follow-up sub-issue along with the {@code + * AdminClient} wiring; pre-creation is sufficient for the {@code TopologyTestDriver}-based unit + * tests in this PR. + */ +class ImpulseTranslator implements PTransformTranslator { + + static final String SOURCE_SUFFIX = "-source"; + static final String STATE_STORE_SUFFIX = "-state"; + + @Override + public void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { + RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); + Map outputs = transform.getOutputsMap(); + if (outputs.size() != 1) { + throw new IllegalArgumentException( + "Impulse " + + transformId + + " must have exactly one output PCollection but had " + + outputs.size()); + } + String outputPCollectionId = onlyValue(outputs); + + Topology topology = context.getTopology(); + String sourceNodeName = transformId + SOURCE_SUFFIX; + String stateStoreName = transformId + STATE_STORE_SUFFIX; + String bootstrapTopic = context.getImpulseBootstrapTopic(); + + topology.addSource( + sourceNodeName, + Serdes.ByteArray().deserializer(), + Serdes.ByteArray().deserializer(), + bootstrapTopic); + topology.addProcessor( + transformId, () -> new ImpulseProcessor(stateStoreName, transformId), sourceNodeName); + topology.addStateStore( + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Boolean()), + transformId); + + context.registerPCollectionProducer(outputPCollectionId, transformId); + } + + private static String onlyValue(Map map) { + Iterator it = map.values().iterator(); + return it.next(); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java index cc915d604b68..eb8567146143 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java @@ -18,19 +18,38 @@ package org.apache.beam.runners.kafka.streams.translation; import java.util.Map; -import java.util.TreeMap; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.graph.PipelineNode; +import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** - * Translates a portable Beam pipeline into a Kafka Streams {@code Topology}. + * Translates a portable Beam pipeline into a Kafka Streams {@link + * org.apache.kafka.streams.Topology}. * - *

The initial implementation only validates the graph and fails fast with an explicit message - * for transforms that are not yet supported. + *

Walks the pipeline in topological order via {@link QueryablePipeline} and dispatches each + * transform to a {@link PTransformTranslator} keyed by URN. Transforms whose URN has no registered + * translator fail fast with a clear {@link UnsupportedOperationException} so the failure points at + * the exact transform that is not yet supported. */ public class KafkaStreamsPipelineTranslator { + private final Map urnToTranslator; + + public KafkaStreamsPipelineTranslator() { + this( + ImmutableMap.builder() + .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator()) + .build()); + } + + KafkaStreamsPipelineTranslator(Map urnToTranslator) { + this.urnToTranslator = urnToTranslator; + } + public KafkaStreamsTranslationContext createTranslationContext( JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions); @@ -42,29 +61,27 @@ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) { } /** - * Translates the pipeline. Throws {@link UnsupportedOperationException} with a clear URN message - * for the first unsupported primitive encountered. + * Walks the pipeline in topological order and translates each transform whose URN is supported. + * Throws {@link UnsupportedOperationException} on the first unsupported URN. */ public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline pipeline) { - Map transforms = pipeline.getComponents().getTransformsMap(); - TreeMap ordered = new TreeMap<>(transforms); - for (Map.Entry entry : ordered.entrySet()) { - RunnerApi.PTransform transform = entry.getValue(); - if (!transform.hasSpec()) { - continue; - } - String urn = transform.getSpec().getUrn(); - if (urn.isEmpty()) { - continue; + QueryablePipeline queryable = + QueryablePipeline.forTransforms( + pipeline.getRootTransformIdsList(), pipeline.getComponents()); + for (PipelineNode.PTransformNode node : queryable.getTopologicallyOrderedTransforms()) { + String urn = node.getTransform().getSpec().getUrn(); + PTransformTranslator translator = urnToTranslator.get(urn); + if (translator == null) { + throw new UnsupportedOperationException( + "No translator registered for URN " + + urn + + " (transformId=" + + node.getId() + + ", jobId=" + + context.getJobInfo().jobId() + + ")"); } - throw new UnsupportedOperationException( - "No translator registered for URN " - + urn - + " (jobId=" - + context.getJobInfo().jobId() - + ")"); + translator.translate(node.getId(), pipeline, context); } - throw new UnsupportedOperationException( - "No translator registered for pipeline (no transform URNs found)"); } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java index 7c6d3d079159..3d95eabafed6 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsTranslationContext.java @@ -17,24 +17,44 @@ */ package org.apache.beam.runners.kafka.streams.translation; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.kafka.streams.Topology; -/** Mutable state shared while translating a portable pipeline into a Kafka Streams topology. */ +/** + * Mutable state shared while translating a portable pipeline into a Kafka Streams {@link Topology}. + * + *

Holds the topology being built and a {@code PCollection-id → processor-node-name} map so that + * downstream transforms can wire themselves to the right parent node. + */ public class KafkaStreamsTranslationContext { + /** Prefix for the per-job bootstrap topic Impulse reads from. */ + private static final String IMPULSE_BOOTSTRAP_TOPIC_PREFIX = "__beam_impulse_"; + private final JobInfo jobInfo; private final KafkaStreamsPipelineOptions pipelineOptions; + private final Topology topology; + private final Map pCollectionIdToProcessorName; public static KafkaStreamsTranslationContext create( JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { - return new KafkaStreamsTranslationContext(jobInfo, pipelineOptions); + return new KafkaStreamsTranslationContext(jobInfo, pipelineOptions, new Topology()); + } + + static KafkaStreamsTranslationContext createWithTopology( + JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions, Topology topology) { + return new KafkaStreamsTranslationContext(jobInfo, pipelineOptions, topology); } private KafkaStreamsTranslationContext( - JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions) { + JobInfo jobInfo, KafkaStreamsPipelineOptions pipelineOptions, Topology topology) { this.jobInfo = jobInfo; this.pipelineOptions = pipelineOptions; + this.topology = topology; + this.pCollectionIdToProcessorName = new HashMap<>(); } public JobInfo getJobInfo() { @@ -44,4 +64,40 @@ public JobInfo getJobInfo() { public KafkaStreamsPipelineOptions getPipelineOptions() { return pipelineOptions; } + + /** Returns the {@link Topology} being built by the translation. */ + public Topology getTopology() { + return topology; + } + + /** + * Registers the processor node that produces the given Beam PCollection. Downstream translators + * resolve their parent processor names by looking up the input PCollection id. + */ + public void registerPCollectionProducer(String pCollectionId, String processorName) { + String existing = pCollectionIdToProcessorName.putIfAbsent(pCollectionId, processorName); + if (existing != null && !existing.equals(processorName)) { + throw new IllegalStateException( + "PCollection " + + pCollectionId + + " already produced by processor " + + existing + + "; cannot reassign to " + + processorName); + } + } + + /** Returns the processor node name producing the given PCollection. */ + public String getProcessorNameForPCollection(String pCollectionId) { + String name = pCollectionIdToProcessorName.get(pCollectionId); + if (name == null) { + throw new IllegalStateException("No processor registered for PCollection " + pCollectionId); + } + return name; + } + + /** Returns the dedicated bootstrap topic name used by Impulse for this application. */ + public String getImpulseBootstrapTopic() { + return IMPULSE_BOOTSTRAP_TOPIC_PREFIX + pipelineOptions.getApplicationId(); + } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/PTransformTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/PTransformTranslator.java new file mode 100644 index 000000000000..c6bd44b8049c --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/PTransformTranslator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import org.apache.beam.model.pipeline.v1.RunnerApi; + +/** + * Translates a single Beam {@link RunnerApi.PTransform} into one or more nodes in the Kafka Streams + * {@link org.apache.kafka.streams.Topology} held by the {@link KafkaStreamsTranslationContext}. + */ +@FunctionalInterface +public interface PTransformTranslator { + + /** + * Translates the transform identified by {@code transformId} into nodes on the topology held by + * {@code context}. + * + * @param transformId the id of the transform to translate, as keyed in {@code + * pipeline.getComponents().getTransformsMap()} + * @param pipeline the full pipeline proto, in case the translator needs to walk subcomponents + * (coders, windowing strategies, etc.) + * @param context the shared translation context (topology under construction, producer map, etc.) + */ + void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context); +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java new file mode 100644 index 000000000000..89fdb5523967 --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** + * Behavioural tests for {@link ImpulseTranslator} using {@link TopologyTestDriver}. + * + *

The translator builds a topology with a real source + processor pair. The tests sit a {@link + * CapturingProcessor} downstream so emitted {@code WindowedValue} elements can be inspected + * directly without going through a Kafka sink topic (the runner does not produce one because no + * downstream PCollections exist yet). + */ +public class ImpulseTranslatorTest { + + @Test + public void impulseEmitsExactlyOneEmptyByteArrayInGlobalWindow() { + KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); + new KafkaStreamsPipelineTranslator() + .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); + + CapturingProcessor capture = new CapturingProcessor(); + Topology topology = context.getTopology(); + topology.addProcessor("capture", capture, "impulse"); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + } + + assertThat(capture.received.size(), is(1)); + WindowedValue only = capture.received.get(0); + assertThat(only, is(notNullValue())); + assertThat(only.getValue().length, is(0)); + assertThat(only.getWindows().size(), is(1)); + assertThat(only.getTimestamp().getMillis(), is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); + } + + @Test + public void impulseDoesNotReEmitOnRestart() { + KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); + new KafkaStreamsPipelineTranslator() + .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); + + CapturingProcessor capture = new CapturingProcessor(); + Topology topology = context.getTopology(); + topology.addProcessor("capture", capture, "impulse"); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + // Trigger again — should be ignored because the state store flag is set. + driver.advanceWallClockTime(Duration.ofSeconds(5)); + } + + assertThat(capture.received.size(), is(1)); + } + + private static Properties baseProps() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-translator-test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } + + /** + * Captures {@link WindowedValue} records forwarded by {@link ImpulseProcessor}. The supplier + * returns a fresh forwarder each call (required by Kafka Streams) but all forwarders write into + * the shared {@link #received} list so the test can read the captured elements after the topology + * is closed. + */ + private static class CapturingProcessor + implements ProcessorSupplier, byte[], WindowedValue> { + + final List> received = new ArrayList<>(); + + @Override + public Processor, byte[], WindowedValue> get() { + return new Processor, byte[], WindowedValue>() { + @Override + public void init(@Nullable ProcessorContext> context) { + // no-op + } + + @Override + public void process(Record> record) { + received.add(record.value()); + } + }; + } + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java index 86e9cf878497..44cc00bcebbd 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.kafka.streams.translation; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; @@ -27,18 +28,14 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.kafka.streams.TopologyDescription; import org.junit.Test; -/** - * Tests for {@link KafkaStreamsPipelineTranslator}. - * - *

The skeleton translator does not yet handle any transforms; these tests pin the current - * "fail-fast with a clear URN" contract so that follow-up sub-issues can replace the assertions as - * real translators are added. - */ +/** Tests for {@link KafkaStreamsPipelineTranslator}. */ public class KafkaStreamsPipelineTranslatorTest { private static final String JOB_ID = "kafka-streams-test-job"; + private static final String OUTPUT_PCOLLECTION_ID = "impulse.out"; @Test public void translateRejectsUnknownTransformWithUrnInMessage() { @@ -47,15 +44,16 @@ public void translateRejectsUnknownTransformWithUrnInMessage() { RunnerApi.Pipeline pipeline = RunnerApi.Pipeline.newBuilder() + .addRootTransformIds("gbk") .setComponents( RunnerApi.Components.newBuilder() .putTransforms( - "impulse", + "gbk", RunnerApi.PTransform.newBuilder() - .setUniqueName("Impulse") + .setUniqueName("GroupByKey") .setSpec( RunnerApi.FunctionSpec.newBuilder() - .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) + .setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)) .build())) .build(); @@ -65,33 +63,31 @@ public void translateRejectsUnknownTransformWithUrnInMessage() { () -> translator.translate(context, translator.prepareForTranslation(pipeline))); assertThat(ex.getMessage(), containsString("No translator registered for URN")); - assertThat(ex.getMessage(), containsString(PTransformTranslation.IMPULSE_TRANSFORM_URN)); + assertThat(ex.getMessage(), containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)); + assertThat(ex.getMessage(), containsString("gbk")); assertThat(ex.getMessage(), containsString(JOB_ID)); } @Test - public void translateRejectsEmptyPipeline() { + public void translateImpulsePipelineAddsSourceAndProcessorNodes() { KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = newContext(); - RunnerApi.Pipeline pipeline = - RunnerApi.Pipeline.newBuilder() - .setComponents(RunnerApi.Components.newBuilder().build()) - .build(); + RunnerApi.Pipeline pipeline = singleImpulsePipeline(); + translator.translate(context, translator.prepareForTranslation(pipeline)); - UnsupportedOperationException ex = - assertThrows( - UnsupportedOperationException.class, - () -> translator.translate(context, translator.prepareForTranslation(pipeline))); + TopologyDescription description = context.getTopology().describe(); + String describeText = description.toString(); - assertThat(ex.getMessage(), containsString("No translator registered")); + assertThat(describeText, containsString("impulse-source")); + assertThat(describeText, containsString("impulse")); + assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID), is("impulse")); } @Test public void createTranslationContextExposesJobInfoAndOptions() { KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); - KafkaStreamsPipelineOptions options = - PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + KafkaStreamsPipelineOptions options = testOptions(); JobInfo jobInfo = JobInfo.create( JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); @@ -102,12 +98,37 @@ public void createTranslationContextExposesJobInfoAndOptions() { assertThat(context.getPipelineOptions().getBootstrapServers(), containsString("localhost")); } - private static KafkaStreamsTranslationContext newContext() { - KafkaStreamsPipelineOptions options = - PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class); + static RunnerApi.Pipeline singleImpulsePipeline() { + return RunnerApi.Pipeline.newBuilder() + .addRootTransformIds("impulse") + .setComponents( + RunnerApi.Components.newBuilder() + .putTransforms( + "impulse", + RunnerApi.PTransform.newBuilder() + .setUniqueName("Impulse") + .putOutputs("out", OUTPUT_PCOLLECTION_ID) + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)) + .build()) + .putPcollections( + OUTPUT_PCOLLECTION_ID, + RunnerApi.PCollection.newBuilder().setUniqueName(OUTPUT_PCOLLECTION_ID).build()) + .build()) + .build(); + } + + static KafkaStreamsTranslationContext newContext() { + KafkaStreamsPipelineOptions options = testOptions(); JobInfo jobInfo = JobInfo.create( JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); return KafkaStreamsTranslationContext.create(jobInfo, options); } + + static KafkaStreamsPipelineOptions testOptions() { + return PipelineOptionsFactory.fromArgs("--applicationId=ks-translator-test") + .as(KafkaStreamsPipelineOptions.class); + } } From 01e7e204ab5c5fe7b465ab8bfce86fb551bd97d6 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Tue, 26 May 2026 13:07:00 +0500 Subject: [PATCH 2/3] Address review feedback on translation framework + Impulse PR - KafkaStreamsPortablePipelineResult: close the race where KafkaStreams could transition to a terminal state before the state listener was registered, leaving waitUntilFinish() to block forever. Also add a volatile cancelled flag so that getState() returns State.CANCELLED after a user cancel(), instead of mapping NOT_RUNNING to State.DONE. - ImpulseProcessor: capture the Cancellable returned by context.schedule and cancel the wall-clock punctuator once the impulse has fired (or if the state store already records a prior emission), so the processor stops doing periodic state-store lookups for the lifetime of the task. - KafkaStreamsPipelineRunner.run: invoke PipelineOptionsValidator.validate on the pipeline options at the start of run() so a missing required option (e.g. applicationId) fails with a clear IllegalArgumentException rather than a raw NullPointerException on Properties.put further down. - ImpulseTranslatorTest: wrap CapturingProcessor.received in Collections.synchronizedList for best-practice thread-safety even though TopologyTestDriver runs single-threaded. --- .../streams/KafkaStreamsPipelineRunner.java | 5 +++++ .../KafkaStreamsPortablePipelineResult.java | 12 ++++++++++++ .../streams/translation/ImpulseProcessor.java | 16 +++++++++++++++- .../translation/ImpulseTranslatorTest.java | 3 ++- 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java index 48a20fff1b98..3e97638695e1 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -43,6 +44,10 @@ public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { @Override public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { + // Surface a clear error if a required option (e.g. applicationId) is missing instead of + // letting Properties.put fail with a raw NullPointerException further down. + PipelineOptionsValidator.validate(KafkaStreamsPipelineOptions.class, pipelineOptions); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, pipelineOptions); diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java index 3596d66789fc..817746bf002f 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java @@ -42,6 +42,7 @@ class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { private final KafkaStreams kafkaStreams; private final CountDownLatch terminated = new CountDownLatch(1); + private volatile boolean cancelled = false; KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { this.kafkaStreams = kafkaStreams; @@ -51,15 +52,26 @@ class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { terminated.countDown(); } }); + // Guard against the race where the KafkaStreams instance transitions to a terminal state + // (e.g. immediate startup failure) before the state listener is registered above. Without + // this check, the latch would never be counted down and waitUntilFinish() would block forever. + KafkaStreams.State current = kafkaStreams.state(); + if (current == KafkaStreams.State.NOT_RUNNING || current == KafkaStreams.State.ERROR) { + terminated.countDown(); + } } @Override public State getState() { + if (cancelled) { + return State.CANCELLED; + } return mapState(kafkaStreams.state()); } @Override public State cancel() throws IOException { + cancelled = true; kafkaStreams.close(); terminated.countDown(); return getState(); diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java index 394aae53f721..ec4f4ef63a43 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; +import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -64,6 +65,7 @@ class ImpulseProcessor implements Processor> context; private @Nullable KeyValueStore firedStore; + private @Nullable Cancellable scheduledPunctuator; ImpulseProcessor(String stateStoreName, String transformId) { this.stateStoreName = stateStoreName; @@ -74,7 +76,8 @@ class ImpulseProcessor implements Processor> context) { this.context = context; this.firedStore = context.getStateStore(stateStoreName); - context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); + this.scheduledPunctuator = + context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire()); } @Override @@ -91,6 +94,7 @@ private void maybeFire() { return; } if (Boolean.TRUE.equals(store.get(FIRED_KEY))) { + cancelPunctuator(); return; } WindowedValue impulse = WindowedValues.valueInGlobalWindow(new byte[0]); @@ -103,6 +107,16 @@ private void maybeFire() { // WindowedValue and is what downstream Beam logic must consult. ctx.forward(new Record>(new byte[0], impulse, 0L)); store.put(FIRED_KEY, Boolean.TRUE); + cancelPunctuator(); LOG.debug("Impulse {} emitted single element", transformId); } + + /** Cancels the wall-clock punctuator after the impulse has fired to stop periodic wakeups. */ + private void cancelPunctuator() { + Cancellable handle = scheduledPunctuator; + if (handle != null) { + handle.cancel(); + scheduledPunctuator = null; + } + } } diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java index 89fdb5523967..a3d0d02ef5f1 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -110,7 +111,7 @@ private static Properties baseProps() { private static class CapturingProcessor implements ProcessorSupplier, byte[], WindowedValue> { - final List> received = new ArrayList<>(); + final List> received = Collections.synchronizedList(new ArrayList<>()); @Override public Processor, byte[], WindowedValue> get() { From 120a348d6e88ff14a754b8e9474d7e3e6274344e Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Wed, 27 May 2026 17:46:08 +0500 Subject: [PATCH 3/3] Address review feedback on Impulse translator --- .../streams/translation/ImpulseProcessor.java | 58 ++++--- .../translation/ImpulseTranslator.java | 27 ++- .../streams/translation/KStreamsPayload.java | 126 ++++++++++++++ .../kafka/streams/KafkaStreamsRunnerTest.java | 161 ++++++++++++++++++ .../translation/ImpulseTranslatorTest.java | 48 ++++-- 5 files changed, 365 insertions(+), 55 deletions(-) create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java index ec4f4ef63a43..e9fc8ddb36aa 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java @@ -34,23 +34,30 @@ /** * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform. * - *

Emits exactly one {@link WindowedValue} carrying an empty {@code byte[]} payload in the {@link - * org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with timestamp {@link - * BoundedWindow#TIMESTAMP_MIN_VALUE}. The emission happens once per task and is persisted in a - * state store keyed by the transform id so that task restarts do not re-emit. + *

For each task instance, emits exactly two {@link KStreamsPayload}s downstream: + * + *

    + *
  1. A {@link KStreamsPayload#data data} payload wrapping a {@link WindowedValue} of an empty + * {@code byte[]} in the {@link org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with + * event-time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. + *
  2. A {@link KStreamsPayload#watermark watermark} payload at {@link + * BoundedWindow#TIMESTAMP_MAX_VALUE} that tells downstream transforms the source is done. + *
+ * + *

A persistent state store records whether the data element has already been emitted so that + * task restarts do not duplicate the data. The terminal watermark, on the other hand, is re-emitted + * on every restart so downstream watermark holds release correctly after recovery (per Jan's review + * on PR #38689). * *

The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the * processor fire even when the dedicated bootstrap source topic is empty, which is the expected * production state. * - *

Watermark advancement to {@code TIMESTAMP_MAX_VALUE} (design doc §4.1) is intentionally - * not performed here. Kafka Streams has no native Beam watermark; the output PCollection's - * watermark moves through the (future) runner-side watermark manager rather than through the {@link - * Record} timestamp. The forwarded Kafka Streams record carries a non-negative record timestamp - * ({@code 0L}) because KS rejects negative record timestamps; the Beam event-time lives inside the - * {@link WindowedValue}. + *

Kafka Streams disallows negative record timestamps, so the forwarded {@link Record} carries + * the Unix epoch ({@code 0L}). The Beam event-time lives inside the {@link KStreamsPayload} + * variant: inside the {@link WindowedValue} for data, or as the explicit watermark millis. */ -class ImpulseProcessor implements Processor> { +class ImpulseProcessor implements Processor> { private static final Logger LOG = LoggerFactory.getLogger(ImpulseProcessor.class); @@ -63,7 +70,7 @@ class ImpulseProcessor implements Processor> context; + private @Nullable ProcessorContext> context; private @Nullable KeyValueStore firedStore; private @Nullable Cancellable scheduledPunctuator; @@ -73,7 +80,7 @@ class ImpulseProcessor implements Processor> context) { + public void init(ProcessorContext> context) { this.context = context; this.firedStore = context.getStateStore(stateStoreName); this.scheduledPunctuator = @@ -88,12 +95,16 @@ public void process(Record record) { } private void maybeFire() { - ProcessorContext> ctx = context; + ProcessorContext> ctx = context; KeyValueStore store = firedStore; if (ctx == null || store == null) { return; } if (Boolean.TRUE.equals(store.get(FIRED_KEY))) { + // Data was already emitted in a previous task lifetime, but downstream watermark holds may + // still need to be released after the restart — re-emit the terminal watermark and stop the + // punctuator. + forwardWatermarkMax(ctx); cancelPunctuator(); return; } @@ -101,14 +112,21 @@ private void maybeFire() { // The output PCollection is not keyed (PCollection); use an empty byte[] as a // placeholder key so downstream processors that adopt the byte[]-key convention see a // consistent shape. - // - // Kafka Streams disallows negative record timestamps, so the Record carries the Unix epoch - // (0L). The Beam event-time, BoundedWindow.TIMESTAMP_MIN_VALUE, lives inside the forwarded - // WindowedValue and is what downstream Beam logic must consult. - ctx.forward(new Record>(new byte[0], impulse, 0L)); + ctx.forward( + new Record>( + new byte[0], KStreamsPayload.data(impulse), 0L)); + forwardWatermarkMax(ctx); store.put(FIRED_KEY, Boolean.TRUE); cancelPunctuator(); - LOG.debug("Impulse {} emitted single element", transformId); + LOG.debug("Impulse {} emitted single element and terminal watermark", transformId); + } + + /** Forwards a terminal {@code TIMESTAMP_MAX_VALUE} watermark payload to downstream processors. */ + private static void forwardWatermarkMax(ProcessorContext> ctx) { + long maxMillis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + ctx.forward( + new Record>( + new byte[0], KStreamsPayload.watermark(maxMillis), 0L)); } /** Cancels the wall-clock punctuator after the impulse has fired to stop periodic wakeups. */ diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java index d73af6a7facc..a90987ba6383 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslator.java @@ -17,9 +17,8 @@ */ package org.apache.beam.runners.kafka.streams.translation; -import java.util.Iterator; -import java.util.Map; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -36,7 +35,9 @@ * a topology that has no real source topic, so the bootstrap topic exists purely to satisfy * that requirement — records published to it are ignored by {@link ImpulseProcessor}. *

  • The {@link ImpulseProcessor} itself, which schedules a one-shot wall-clock punctuator on - * {@code init} and emits a single empty {@code WindowedValue} downstream. + * {@code init} and emits a single empty data {@link KStreamsPayload} followed by a terminal + * watermark payload at {@link + * org.apache.beam.sdk.transforms.windowing.BoundedWindow#TIMESTAMP_MAX_VALUE}. *
  • A per-processor {@link KeyValueBytesStoreSupplier persistent state store} that records * whether the impulse has already fired so task restarts do not duplicate it. * @@ -60,15 +61,12 @@ class ImpulseTranslator implements PTransformTranslator { public void translate( String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); - Map outputs = transform.getOutputsMap(); - if (outputs.size() != 1) { - throw new IllegalArgumentException( - "Impulse " - + transformId - + " must have exactly one output PCollection but had " - + outputs.size()); - } - String outputPCollectionId = onlyValue(outputs); + // Impulse produces exactly one output PCollection. This is the produced-outputs map on the + // transform, not the consumer count — downstream transforms that consume this PCollection are + // modeled as separate PTransforms whose `inputs` reference the same PCollection id, and they + // are wired up by their own translators. Iterables.getOnlyElement throws a clear + // IllegalArgumentException if the proto is malformed. + String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); Topology topology = context.getTopology(); String sourceNodeName = transformId + SOURCE_SUFFIX; @@ -89,9 +87,4 @@ public void translate( context.registerPCollectionProducer(outputPCollectionId, transformId); } - - private static String onlyValue(Map map) { - Iterator it = map.values().iterator(); - return it.next(); - } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java new file mode 100644 index 000000000000..47c94eea6eff --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.Objects; +import org.apache.beam.sdk.values.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Sum-type envelope flowing between Kafka Streams processors in the Beam Kafka Streams runner. + * + *

    Every record value emitted by a runner-introduced processor is one of: + * + *

      + *
    • A {@link #isData() data} element wrapping a {@link WindowedValue}, or + *
    • A {@link #isWatermark() watermark} signal carrying an event-time milliseconds value. + *
    + * + *

    The envelope lets a single Kafka Streams output channel carry both Beam data and the watermark + * / synchronization primitives that Kafka Streams does not natively support. Future control + * messages (e.g. the {@code (epoch, assigned_partitions)} propagation from design doc §5) can be + * added here as additional variants. + * + *

    This class is intentionally in-JVM only for now; serialization across topic boundaries + * (repartition or sink topics) will be introduced when the first translator that emits to a topic + * lands, at which point a corresponding Kafka {@link org.apache.kafka.common.serialization.Serde} + * will be added. + * + * @param element type carried by data variants + */ +public final class KStreamsPayload { + + private enum Kind { + DATA, + WATERMARK + } + + private final Kind kind; + private final @Nullable WindowedValue data; + private final long watermarkMillis; + + private KStreamsPayload(Kind kind, @Nullable WindowedValue data, long watermarkMillis) { + this.kind = kind; + this.data = data; + this.watermarkMillis = watermarkMillis; + } + + /** Returns a data payload wrapping the given {@link WindowedValue}. */ + public static KStreamsPayload data(WindowedValue value) { + return new KStreamsPayload<>(Kind.DATA, value, 0L); + } + + /** Returns a watermark payload carrying the given event-time milliseconds. */ + public static KStreamsPayload watermark(long watermarkMillis) { + return new KStreamsPayload<>(Kind.WATERMARK, null, watermarkMillis); + } + + public boolean isData() { + return kind == Kind.DATA; + } + + public boolean isWatermark() { + return kind == Kind.WATERMARK; + } + + /** + * Returns the wrapped data element. Caller must check {@link #isData()} first; calling this on a + * watermark payload throws. + */ + public WindowedValue getData() { + if (kind != Kind.DATA || data == null) { + throw new IllegalStateException("Payload is not a data element: kind=" + kind); + } + return data; + } + + /** + * Returns the watermark event-time milliseconds. Caller must check {@link #isWatermark()} first; + * calling this on a data payload throws. + */ + public long getWatermarkMillis() { + if (kind != Kind.WATERMARK) { + throw new IllegalStateException("Payload is not a watermark: kind=" + kind); + } + return watermarkMillis; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KStreamsPayload)) { + return false; + } + KStreamsPayload that = (KStreamsPayload) o; + return kind == that.kind + && watermarkMillis == that.watermarkMillis + && Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(kind, data, watermarkMillis); + } + + @Override + public String toString() { + return kind == Kind.DATA ? "Data{" + data + "}" : "Watermark{" + watermarkMillis + "}"; + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java new file mode 100644 index 000000000000..30000d77d864 --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/KafkaStreamsRunnerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.translation.KStreamsPayload; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; +import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** + * Pipeline-level integration tests that build a Beam {@link Pipeline} via the high-level Java SDK + * ({@code Pipeline.create().apply(Impulse.create())}), translate it via the runner, and execute the + * resulting Kafka Streams topology under {@link TopologyTestDriver}. + * + *

    This is the test layer Jan requested on PR #38689: rather than building hand-rolled {@link + * RunnerApi.Pipeline} protos, drive translation from the same surface a user would write. The tests + * stop short of calling {@code pipeline.run()} because that would require a real Kafka broker — + * {@code TopologyTestDriver} replaces the broker for unit-test purposes. + */ +public class KafkaStreamsRunnerTest { + + private static final String JOB_ID = "kafka-streams-runner-test"; + private static final String APPLICATION_ID = "ks-runner-test"; + + @Test + public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() { + Pipeline pipeline = Pipeline.create(pipelineOptions()); + pipeline.apply("impulse", Impulse.create()); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + + KafkaStreamsPipelineOptions options = + pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + JobInfo jobInfo = + JobInfo.create( + JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); + KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); + translator.translate(context, translator.prepareForTranslation(pipelineProto)); + + CapturingProcessor capture = new CapturingProcessor(); + Topology topology = context.getTopology(); + // Wire a downstream test sink to every translated transform node so we can capture emissions. + // Impulse is the only transform here, so we attach to "impulse" (the processor name registered + // by ImpulseTranslator). + topology.addProcessor("capture", capture, expectedImpulseProcessorName(pipelineProto)); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + } + + assertThat(capture.received.size(), is(2)); + assertThat(capture.received.get(0).isData(), is(true)); + assertThat(capture.received.get(0).getData().getValue().length, is(0)); + assertThat(capture.received.get(1).isWatermark(), is(true)); + assertThat( + capture.received.get(1).getWatermarkMillis(), + is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + } + + /** + * Finds the transform id that {@link Impulse} got assigned by the SDK so the test can attach a + * capturing processor to the matching Kafka Streams processor node (the translator names the + * processor after the transform id). + */ + private static String expectedImpulseProcessorName(RunnerApi.Pipeline pipelineProto) { + for (java.util.Map.Entry entry : + pipelineProto.getComponents().getTransformsMap().entrySet()) { + if ("beam:transform:impulse:v1".equals(entry.getValue().getSpec().getUrn())) { + return entry.getKey(); + } + } + throw new IllegalStateException("Impulse transform not found in pipeline proto"); + } + + private static PipelineOptions pipelineOptions() { + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); + // Pipeline.create() requires a runner; CrashingRunner is the conventional "this pipeline is + // not going to be run() directly" choice used by other portable-runner tests. + options.setRunner(CrashingRunner.class); + options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); + return options; + } + + private static Properties streamsConfig() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } + + private static class CapturingProcessor + implements ProcessorSupplier< + byte[], KStreamsPayload, byte[], KStreamsPayload> { + + final List> received = Collections.synchronizedList(new ArrayList<>()); + + @Override + public Processor, byte[], KStreamsPayload> get() { + return new Processor, byte[], KStreamsPayload>() { + @Override + public void init(@Nullable ProcessorContext> context) { + // no-op + } + + @Override + public void process(Record> record) { + received.add(record.value()); + } + }; + } + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java index a3d0d02ef5f1..a092b10e02c4 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ImpulseTranslatorTest.java @@ -43,14 +43,14 @@ * Behavioural tests for {@link ImpulseTranslator} using {@link TopologyTestDriver}. * *

    The translator builds a topology with a real source + processor pair. The tests sit a {@link - * CapturingProcessor} downstream so emitted {@code WindowedValue} elements can be inspected + * CapturingProcessor} downstream so emitted {@link KStreamsPayload} elements can be inspected * directly without going through a Kafka sink topic (the runner does not produce one because no * downstream PCollections exist yet). */ public class ImpulseTranslatorTest { @Test - public void impulseEmitsExactlyOneEmptyByteArrayInGlobalWindow() { + public void impulseEmitsDataElementFollowedByTerminalWatermark() { KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); new KafkaStreamsPipelineTranslator() .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); @@ -64,16 +64,24 @@ public void impulseEmitsExactlyOneEmptyByteArrayInGlobalWindow() { driver.advanceWallClockTime(Duration.ofSeconds(1)); } - assertThat(capture.received.size(), is(1)); - WindowedValue only = capture.received.get(0); - assertThat(only, is(notNullValue())); - assertThat(only.getValue().length, is(0)); - assertThat(only.getWindows().size(), is(1)); - assertThat(only.getTimestamp().getMillis(), is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); + assertThat(capture.received.size(), is(2)); + + KStreamsPayload dataPayload = capture.received.get(0); + assertThat(dataPayload, is(notNullValue())); + assertThat(dataPayload.isData(), is(true)); + WindowedValue data = dataPayload.getData(); + assertThat(data.getValue().length, is(0)); + assertThat(data.getWindows().size(), is(1)); + assertThat(data.getTimestamp().getMillis(), is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())); + + KStreamsPayload watermarkPayload = capture.received.get(1); + assertThat(watermarkPayload.isWatermark(), is(true)); + assertThat( + watermarkPayload.getWatermarkMillis(), is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } @Test - public void impulseDoesNotReEmitOnRestart() { + public void impulseDoesNotReEmitDataOnRepeatedPunctuation() { KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext(); new KafkaStreamsPipelineTranslator() .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline()); @@ -84,11 +92,14 @@ public void impulseDoesNotReEmitOnRestart() { try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) { driver.advanceWallClockTime(Duration.ofSeconds(1)); - // Trigger again — should be ignored because the state store flag is set. + // Trigger again — the data element is gated by the state store; the punctuator should also + // have been cancelled after the first emission, so no further events should be captured. driver.advanceWallClockTime(Duration.ofSeconds(5)); } - assertThat(capture.received.size(), is(1)); + assertThat(capture.received.size(), is(2)); + assertThat(capture.received.get(0).isData(), is(true)); + assertThat(capture.received.get(1).isWatermark(), is(true)); } private static Properties baseProps() { @@ -103,26 +114,27 @@ private static Properties baseProps() { } /** - * Captures {@link WindowedValue} records forwarded by {@link ImpulseProcessor}. The supplier + * Captures {@link KStreamsPayload} records forwarded by {@link ImpulseProcessor}. The supplier * returns a fresh forwarder each call (required by Kafka Streams) but all forwarders write into * the shared {@link #received} list so the test can read the captured elements after the topology * is closed. */ private static class CapturingProcessor - implements ProcessorSupplier, byte[], WindowedValue> { + implements ProcessorSupplier< + byte[], KStreamsPayload, byte[], KStreamsPayload> { - final List> received = Collections.synchronizedList(new ArrayList<>()); + final List> received = Collections.synchronizedList(new ArrayList<>()); @Override - public Processor, byte[], WindowedValue> get() { - return new Processor, byte[], WindowedValue>() { + public Processor, byte[], KStreamsPayload> get() { + return new Processor, byte[], KStreamsPayload>() { @Override - public void init(@Nullable ProcessorContext> context) { + public void init(@Nullable ProcessorContext> context) { // no-op } @Override - public void process(Record> record) { + public void process(Record> record) { received.add(record.value()); } };