diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle index 9204fef4e768..3f34a3ca76b6 100644 --- a/runners/kafka-streams/build.gradle +++ b/runners/kafka-streams/build.gradle @@ -44,6 +44,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(path: ":model:job-management", configuration: "shadow") implementation project(":runners:core-java") permitUnusedDeclared project(":runners:core-java") implementation project(":runners:java-fn-execution") @@ -58,10 +59,10 @@ dependencies { implementation "org.apache.kafka:kafka-clients:$kafka_version" implementation "org.apache.kafka:kafka-streams:$kafka_version" permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" - permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core + testImplementation "org.apache.kafka:kafka-streams-test-utils:$kafka_version" } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java index 7c78d3a751b4..3e97638695e1 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPipelineRunner.java @@ -17,16 +17,25 @@ */ package org.apache.beam.runners.kafka.streams; +import java.util.Properties; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator; import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** Executes a portable pipeline by translating it to Kafka Streams. */ +/** Executes a portable pipeline by translating it to a Kafka Streams {@link Topology}. */ public class KafkaStreamsPipelineRunner implements PortablePipelineRunner { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipelineRunner.class); + private final KafkaStreamsPipelineOptions pipelineOptions; public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { @@ -34,12 +43,35 @@ public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) { } @Override - public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception { + public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) { + // Surface a clear error if a required option (e.g. applicationId) is missing instead of + // letting Properties.put fail with a raw NullPointerException further down. + PipelineOptionsValidator.validate(KafkaStreamsPipelineOptions.class, pipelineOptions); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, pipelineOptions); RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline); translator.translate(context, prepared); - throw new IllegalStateException("Translation unexpectedly completed without an executor"); + + Topology topology = context.getTopology(); + LOG.info( + "Translated pipeline {} into Kafka Streams topology:\n{}", + jobInfo.jobId(), + topology.describe()); + + KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig(jobInfo)); + kafkaStreams.start(); + return new KafkaStreamsPortablePipelineResult(kafkaStreams); + } + + private Properties streamsConfig(JobInfo jobInfo) { + Properties props = new Properties(); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, pipelineOptions.getBootstrapServers()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, pipelineOptions.getApplicationId()); + props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir()); + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId()); + return props; } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java new file mode 100644 index 000000000000..817746bf002f --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/KafkaStreamsPortablePipelineResult.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.runners.jobsubmission.PortablePipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.kafka.streams.KafkaStreams; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Result of executing a portable pipeline as a {@link KafkaStreams} application. + * + *
Translates the underlying {@link KafkaStreams.State} into Beam's {@link + * org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / {@link + * #waitUntilFinish()} to the {@code KafkaStreams} instance. + */ +class KafkaStreamsPortablePipelineResult implements PortablePipelineResult { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class); + + private final KafkaStreams kafkaStreams; + private final CountDownLatch terminated = new CountDownLatch(1); + private volatile boolean cancelled = false; + + KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) { + this.kafkaStreams = kafkaStreams; + kafkaStreams.setStateListener( + (newState, oldState) -> { + if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) { + terminated.countDown(); + } + }); + // Guard against the race where the KafkaStreams instance transitions to a terminal state + // (e.g. immediate startup failure) before the state listener is registered above. Without + // this check, the latch would never be counted down and waitUntilFinish() would block forever. + KafkaStreams.State current = kafkaStreams.state(); + if (current == KafkaStreams.State.NOT_RUNNING || current == KafkaStreams.State.ERROR) { + terminated.countDown(); + } + } + + @Override + public State getState() { + if (cancelled) { + return State.CANCELLED; + } + return mapState(kafkaStreams.state()); + } + + @Override + public State cancel() throws IOException { + cancelled = true; + kafkaStreams.close(); + terminated.countDown(); + return getState(); + } + + @Override + public State waitUntilFinish(Duration duration) { + try { + boolean reachedTerminal = terminated.await(duration.getMillis(), TimeUnit.MILLISECONDS); + if (!reachedTerminal) { + return getState(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return State.UNKNOWN; + } + return getState(); + } + + @Override + public State waitUntilFinish() { + try { + terminated.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return State.UNKNOWN; + } + return getState(); + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "Metrics are not yet implemented in the Kafka Streams runner."); + } + + @Override + public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException { + LOG.debug("portableMetrics() not yet implemented in the Kafka Streams runner"); + return JobApi.MetricResults.newBuilder().build(); + } + + private static State mapState(KafkaStreams.State state) { + switch (state) { + case CREATED: + case REBALANCING: + return State.RUNNING; + case RUNNING: + return State.RUNNING; + case PENDING_SHUTDOWN: + return State.CANCELLED; + case PENDING_ERROR: + case ERROR: + return State.FAILED; + case NOT_RUNNING: + return State.DONE; + default: + return State.UNKNOWN; + } + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java new file mode 100644 index 000000000000..e9fc8ddb36aa --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ImpulseProcessor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.time.Duration; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform. + * + *
For each task instance, emits exactly two {@link KStreamsPayload}s downstream: + * + *
A persistent state store records whether the data element has already been emitted so that + * task restarts do not duplicate the data. The terminal watermark, on the other hand, is re-emitted + * on every restart so downstream watermark holds release correctly after recovery (per Jan's review + * on PR #38689). + * + *
The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the + * processor fire even when the dedicated bootstrap source topic is empty, which is the expected + * production state. + * + *
Kafka Streams disallows negative record timestamps, so the forwarded {@link Record} carries
+ * the Unix epoch ({@code 0L}). The Beam event-time lives inside the {@link KStreamsPayload}
+ * variant: inside the {@link WindowedValue} for data, or as the explicit watermark millis.
+ */
+class ImpulseProcessor implements Processor Adds three nodes to the Kafka Streams {@link Topology}:
+ *
+ * The processor's output PCollection is registered with the translation context so subsequent
+ * translators can wire themselves to this node by id.
+ *
+ * Bootstrap topic lifecycle: this translator does not auto-create the bootstrap
+ * topic. The topic is expected to exist on the broker before the job starts; otherwise Kafka
+ * Streams raises {@code MissingSourceTopicException} on startup. The auto-create-vs-pre-create
+ * decision (design doc §12.1) is deferred to a follow-up sub-issue along with the {@code
+ * AdminClient} wiring; pre-creation is sufficient for the {@code TopologyTestDriver}-based unit
+ * tests in this PR.
+ */
+class ImpulseTranslator implements PTransformTranslator {
+
+ static final String SOURCE_SUFFIX = "-source";
+ static final String STATE_STORE_SUFFIX = "-state";
+
+ @Override
+ public void translate(
+ String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
+ RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
+ // Impulse produces exactly one output PCollection. This is the produced-outputs map on the
+ // transform, not the consumer count — downstream transforms that consume this PCollection are
+ // modeled as separate PTransforms whose `inputs` reference the same PCollection id, and they
+ // are wired up by their own translators. Iterables.getOnlyElement throws a clear
+ // IllegalArgumentException if the proto is malformed.
+ String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
+
+ Topology topology = context.getTopology();
+ String sourceNodeName = transformId + SOURCE_SUFFIX;
+ String stateStoreName = transformId + STATE_STORE_SUFFIX;
+ String bootstrapTopic = context.getImpulseBootstrapTopic();
+
+ topology.addSource(
+ sourceNodeName,
+ Serdes.ByteArray().deserializer(),
+ Serdes.ByteArray().deserializer(),
+ bootstrapTopic);
+ topology.addProcessor(
+ transformId, () -> new ImpulseProcessor(stateStoreName, transformId), sourceNodeName);
+ topology.addStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(stateStoreName), Serdes.String(), Serdes.Boolean()),
+ transformId);
+
+ context.registerPCollectionProducer(outputPCollectionId, transformId);
+ }
+}
diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
new file mode 100644
index 000000000000..47c94eea6eff
--- /dev/null
+++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.kafka.streams.translation;
+
+import java.util.Objects;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Sum-type envelope flowing between Kafka Streams processors in the Beam Kafka Streams runner.
+ *
+ * Every record value emitted by a runner-introduced processor is one of:
+ *
+ * The envelope lets a single Kafka Streams output channel carry both Beam data and the watermark
+ * / synchronization primitives that Kafka Streams does not natively support. Future control
+ * messages (e.g. the {@code (epoch, assigned_partitions)} propagation from design doc §5) can be
+ * added here as additional variants.
+ *
+ * This class is intentionally in-JVM only for now; serialization across topic boundaries
+ * (repartition or sink topics) will be introduced when the first translator that emits to a topic
+ * lands, at which point a corresponding Kafka {@link org.apache.kafka.common.serialization.Serde}
+ * will be added.
+ *
+ * @param The initial implementation only validates the graph and fails fast with an explicit message
- * for transforms that are not yet supported.
+ * Walks the pipeline in topological order via {@link QueryablePipeline} and dispatches each
+ * transform to a {@link PTransformTranslator} keyed by URN. Transforms whose URN has no registered
+ * translator fail fast with a clear {@link UnsupportedOperationException} so the failure points at
+ * the exact transform that is not yet supported.
*/
public class KafkaStreamsPipelineTranslator {
+ private final Map Holds the topology being built and a {@code PCollection-id → processor-node-name} map so that
+ * downstream transforms can wire themselves to the right parent node.
+ */
public class KafkaStreamsTranslationContext {
+ /** Prefix for the per-job bootstrap topic Impulse reads from. */
+ private static final String IMPULSE_BOOTSTRAP_TOPIC_PREFIX = "__beam_impulse_";
+
private final JobInfo jobInfo;
private final KafkaStreamsPipelineOptions pipelineOptions;
+ private final Topology topology;
+ private final Map This is the test layer Jan requested on PR #38689: rather than building hand-rolled {@link
+ * RunnerApi.Pipeline} protos, drive translation from the same surface a user would write. The tests
+ * stop short of calling {@code pipeline.run()} because that would require a real Kafka broker —
+ * {@code TopologyTestDriver} replaces the broker for unit-test purposes.
+ */
+public class KafkaStreamsRunnerTest {
+
+ private static final String JOB_ID = "kafka-streams-runner-test";
+ private static final String APPLICATION_ID = "ks-runner-test";
+
+ @Test
+ public void impulseOnlyPipelineEmitsDataAndTerminalWatermark() {
+ Pipeline pipeline = Pipeline.create(pipelineOptions());
+ pipeline.apply("impulse", Impulse.create());
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+
+ KafkaStreamsPipelineOptions options =
+ pipeline.getOptions().as(KafkaStreamsPipelineOptions.class);
+ KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
+ JobInfo jobInfo =
+ JobInfo.create(
+ JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
+ KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options);
+ translator.translate(context, translator.prepareForTranslation(pipelineProto));
+
+ CapturingProcessor capture = new CapturingProcessor();
+ Topology topology = context.getTopology();
+ // Wire a downstream test sink to every translated transform node so we can capture emissions.
+ // Impulse is the only transform here, so we attach to "impulse" (the processor name registered
+ // by ImpulseTranslator).
+ topology.addProcessor("capture", capture, expectedImpulseProcessorName(pipelineProto));
+
+ try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) {
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ }
+
+ assertThat(capture.received.size(), is(2));
+ assertThat(capture.received.get(0).isData(), is(true));
+ assertThat(capture.received.get(0).getData().getValue().length, is(0));
+ assertThat(capture.received.get(1).isWatermark(), is(true));
+ assertThat(
+ capture.received.get(1).getWatermarkMillis(),
+ is(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ }
+
+ /**
+ * Finds the transform id that {@link Impulse} got assigned by the SDK so the test can attach a
+ * capturing processor to the matching Kafka Streams processor node (the translator names the
+ * processor after the transform id).
+ */
+ private static String expectedImpulseProcessorName(RunnerApi.Pipeline pipelineProto) {
+ for (java.util.Map.Entry The translator builds a topology with a real source + processor pair. The tests sit a {@link
+ * CapturingProcessor} downstream so emitted {@link KStreamsPayload} elements can be inspected
+ * directly without going through a Kafka sink topic (the runner does not produce one because no
+ * downstream PCollections exist yet).
+ */
+public class ImpulseTranslatorTest {
+
+ @Test
+ public void impulseEmitsDataElementFollowedByTerminalWatermark() {
+ KafkaStreamsTranslationContext context = KafkaStreamsPipelineTranslatorTest.newContext();
+ new KafkaStreamsPipelineTranslator()
+ .translate(context, KafkaStreamsPipelineTranslatorTest.singleImpulsePipeline());
+
+ CapturingProcessor capture = new CapturingProcessor();
+ Topology topology = context.getTopology();
+ topology.addProcessor("capture", capture, "impulse");
+
+ try (TopologyTestDriver driver = new TopologyTestDriver(topology, baseProps())) {
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ driver.advanceWallClockTime(Duration.ofSeconds(1));
+ }
+
+ assertThat(capture.received.size(), is(2));
+
+ KStreamsPayload The skeleton translator does not yet handle any transforms; these tests pin the current
- * "fail-fast with a clear URN" contract so that follow-up sub-issues can replace the assertions as
- * real translators are added.
- */
+/** Tests for {@link KafkaStreamsPipelineTranslator}. */
public class KafkaStreamsPipelineTranslatorTest {
private static final String JOB_ID = "kafka-streams-test-job";
+ private static final String OUTPUT_PCOLLECTION_ID = "impulse.out";
@Test
public void translateRejectsUnknownTransformWithUrnInMessage() {
@@ -47,15 +44,16 @@ public void translateRejectsUnknownTransformWithUrnInMessage() {
RunnerApi.Pipeline pipeline =
RunnerApi.Pipeline.newBuilder()
+ .addRootTransformIds("gbk")
.setComponents(
RunnerApi.Components.newBuilder()
.putTransforms(
- "impulse",
+ "gbk",
RunnerApi.PTransform.newBuilder()
- .setUniqueName("Impulse")
+ .setUniqueName("GroupByKey")
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN))
+ .setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN))
.build()))
.build();
@@ -65,33 +63,31 @@ public void translateRejectsUnknownTransformWithUrnInMessage() {
() -> translator.translate(context, translator.prepareForTranslation(pipeline)));
assertThat(ex.getMessage(), containsString("No translator registered for URN"));
- assertThat(ex.getMessage(), containsString(PTransformTranslation.IMPULSE_TRANSFORM_URN));
+ assertThat(ex.getMessage(), containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN));
+ assertThat(ex.getMessage(), containsString("gbk"));
assertThat(ex.getMessage(), containsString(JOB_ID));
}
@Test
- public void translateRejectsEmptyPipeline() {
+ public void translateImpulsePipelineAddsSourceAndProcessorNodes() {
KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
KafkaStreamsTranslationContext context = newContext();
- RunnerApi.Pipeline pipeline =
- RunnerApi.Pipeline.newBuilder()
- .setComponents(RunnerApi.Components.newBuilder().build())
- .build();
+ RunnerApi.Pipeline pipeline = singleImpulsePipeline();
+ translator.translate(context, translator.prepareForTranslation(pipeline));
- UnsupportedOperationException ex =
- assertThrows(
- UnsupportedOperationException.class,
- () -> translator.translate(context, translator.prepareForTranslation(pipeline)));
+ TopologyDescription description = context.getTopology().describe();
+ String describeText = description.toString();
- assertThat(ex.getMessage(), containsString("No translator registered"));
+ assertThat(describeText, containsString("impulse-source"));
+ assertThat(describeText, containsString("impulse"));
+ assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID), is("impulse"));
}
@Test
public void createTranslationContextExposesJobInfoAndOptions() {
KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
- KafkaStreamsPipelineOptions options =
- PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class);
+ KafkaStreamsPipelineOptions options = testOptions();
JobInfo jobInfo =
JobInfo.create(
JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
@@ -102,12 +98,37 @@ public void createTranslationContextExposesJobInfoAndOptions() {
assertThat(context.getPipelineOptions().getBootstrapServers(), containsString("localhost"));
}
- private static KafkaStreamsTranslationContext newContext() {
- KafkaStreamsPipelineOptions options =
- PipelineOptionsFactory.create().as(KafkaStreamsPipelineOptions.class);
+ static RunnerApi.Pipeline singleImpulsePipeline() {
+ return RunnerApi.Pipeline.newBuilder()
+ .addRootTransformIds("impulse")
+ .setComponents(
+ RunnerApi.Components.newBuilder()
+ .putTransforms(
+ "impulse",
+ RunnerApi.PTransform.newBuilder()
+ .setUniqueName("Impulse")
+ .putOutputs("out", OUTPUT_PCOLLECTION_ID)
+ .setSpec(
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN))
+ .build())
+ .putPcollections(
+ OUTPUT_PCOLLECTION_ID,
+ RunnerApi.PCollection.newBuilder().setUniqueName(OUTPUT_PCOLLECTION_ID).build())
+ .build())
+ .build();
+ }
+
+ static KafkaStreamsTranslationContext newContext() {
+ KafkaStreamsPipelineOptions options = testOptions();
JobInfo jobInfo =
JobInfo.create(
JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options));
return KafkaStreamsTranslationContext.create(jobInfo, options);
}
+
+ static KafkaStreamsPipelineOptions testOptions() {
+ return PipelineOptionsFactory.fromArgs("--applicationId=ks-translator-test")
+ .as(KafkaStreamsPipelineOptions.class);
+ }
}
+ *
+ *
+ *
+ *
+ *
+ *