Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion runners/kafka-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {

implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(":runners:core-java")
permitUnusedDeclared project(":runners:core-java")
implementation project(":runners:java-fn-execution")
Expand All @@ -58,10 +59,10 @@ dependencies {
implementation "org.apache.kafka:kafka-clients:$kafka_version"
implementation "org.apache.kafka:kafka-streams:$kafka_version"
permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version"
permitUnusedDeclared "org.apache.kafka:kafka-streams:$kafka_version"

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation "org.apache.kafka:kafka-streams-test-utils:$kafka_version"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,61 @@
*/
package org.apache.beam.runners.kafka.streams;

import java.util.Properties;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsPipelineTranslator;
import org.apache.beam.runners.kafka.streams.translation.KafkaStreamsTranslationContext;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Executes a portable pipeline by translating it to Kafka Streams. */
/** Executes a portable pipeline by translating it to a Kafka Streams {@link Topology}. */
public class KafkaStreamsPipelineRunner implements PortablePipelineRunner {

private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipelineRunner.class);

private final KafkaStreamsPipelineOptions pipelineOptions;

public KafkaStreamsPipelineRunner(KafkaStreamsPipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
}

@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
// Surface a clear error if a required option (e.g. applicationId) is missing instead of
// letting Properties.put fail with a raw NullPointerException further down.
PipelineOptionsValidator.validate(KafkaStreamsPipelineOptions.class, pipelineOptions);

KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator();
KafkaStreamsTranslationContext context =
translator.createTranslationContext(jobInfo, pipelineOptions);
RunnerApi.Pipeline prepared = translator.prepareForTranslation(pipeline);
translator.translate(context, prepared);
throw new IllegalStateException("Translation unexpectedly completed without an executor");

Topology topology = context.getTopology();
LOG.info(
"Translated pipeline {} into Kafka Streams topology:\n{}",
jobInfo.jobId(),
topology.describe());

KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfig(jobInfo));
kafkaStreams.start();
return new KafkaStreamsPortablePipelineResult(kafkaStreams);
}

private Properties streamsConfig(JobInfo jobInfo) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, pipelineOptions.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, pipelineOptions.getApplicationId());
props.put(StreamsConfig.STATE_DIR_CONFIG, pipelineOptions.getStateDir());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.CLIENT_ID_CONFIG, jobInfo.jobId());
return props;
}
Comment thread
junaiddshaukat marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.kafka.streams.KafkaStreams;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Result of executing a portable pipeline as a {@link KafkaStreams} application.
*
* <p>Translates the underlying {@link KafkaStreams.State} into Beam's {@link
* org.apache.beam.sdk.PipelineResult.State} and forwards {@link #cancel()} / {@link
* #waitUntilFinish()} to the {@code KafkaStreams} instance.
*/
class KafkaStreamsPortablePipelineResult implements PortablePipelineResult {

private static final Logger LOG =
LoggerFactory.getLogger(KafkaStreamsPortablePipelineResult.class);

private final KafkaStreams kafkaStreams;
private final CountDownLatch terminated = new CountDownLatch(1);
private volatile boolean cancelled = false;

KafkaStreamsPortablePipelineResult(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
kafkaStreams.setStateListener(
(newState, oldState) -> {
if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) {
terminated.countDown();
}
});
// Guard against the race where the KafkaStreams instance transitions to a terminal state
// (e.g. immediate startup failure) before the state listener is registered above. Without
// this check, the latch would never be counted down and waitUntilFinish() would block forever.
KafkaStreams.State current = kafkaStreams.state();
if (current == KafkaStreams.State.NOT_RUNNING || current == KafkaStreams.State.ERROR) {
terminated.countDown();
}
}
Comment thread
junaiddshaukat marked this conversation as resolved.

@Override
public State getState() {
if (cancelled) {
return State.CANCELLED;
}
return mapState(kafkaStreams.state());
}

@Override
public State cancel() throws IOException {
cancelled = true;
kafkaStreams.close();
terminated.countDown();
return getState();
}
Comment thread
junaiddshaukat marked this conversation as resolved.

@Override
public State waitUntilFinish(Duration duration) {
try {
boolean reachedTerminal = terminated.await(duration.getMillis(), TimeUnit.MILLISECONDS);
if (!reachedTerminal) {
return getState();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return State.UNKNOWN;
}
return getState();
}

@Override
public State waitUntilFinish() {
try {
terminated.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return State.UNKNOWN;
}
return getState();
}

@Override
public MetricResults metrics() {
throw new UnsupportedOperationException(
"Metrics are not yet implemented in the Kafka Streams runner.");
}

@Override
public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
LOG.debug("portableMetrics() not yet implemented in the Kafka Streams runner");
return JobApi.MetricResults.newBuilder().build();
}

private static State mapState(KafkaStreams.State state) {
switch (state) {
case CREATED:
case REBALANCING:
return State.RUNNING;
case RUNNING:
return State.RUNNING;
case PENDING_SHUTDOWN:
return State.CANCELLED;
case PENDING_ERROR:
case ERROR:
return State.FAILED;
case NOT_RUNNING:
return State.DONE;
default:
return State.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.time.Duration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Kafka Streams {@link Processor} implementing Beam's {@code Impulse} transform.
*
* <p>For each task instance, emits exactly two {@link KStreamsPayload}s downstream:
*
* <ol>
* <li>A {@link KStreamsPayload#data data} payload wrapping a {@link WindowedValue} of an empty
* {@code byte[]} in the {@link org.apache.beam.sdk.transforms.windowing.GlobalWindow}, with
* event-time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
* <li>A {@link KStreamsPayload#watermark watermark} payload at {@link
* BoundedWindow#TIMESTAMP_MAX_VALUE} that tells downstream transforms the source is done.
* </ol>
*
* <p>A persistent state store records whether the data element has already been emitted so that
* task restarts do not duplicate the data. The terminal watermark, on the other hand, is re-emitted
* on every restart so downstream watermark holds release correctly after recovery (per Jan's review
* on PR #38689).
*
* <p>The trigger comes from a wall-clock punctuator scheduled on {@link #init} — this lets the
* processor fire even when the dedicated bootstrap source topic is empty, which is the expected
* production state.
*
* <p>Kafka Streams disallows negative record timestamps, so the forwarded {@link Record} carries
* the Unix epoch ({@code 0L}). The Beam event-time lives inside the {@link KStreamsPayload}
* variant: inside the {@link WindowedValue} for data, or as the explicit watermark millis.
*/
class ImpulseProcessor implements Processor<byte[], byte[], byte[], KStreamsPayload<byte[]>> {

private static final Logger LOG = LoggerFactory.getLogger(ImpulseProcessor.class);

/** Sole entry in the state store; the value tracks whether this processor has already emitted. */
static final String FIRED_KEY = "fired";

/** How soon after {@link #init} the punctuator first fires. */
private static final Duration PUNCTUATION_DELAY = Duration.ofMillis(50);

private final String stateStoreName;
private final String transformId;

private @Nullable ProcessorContext<byte[], KStreamsPayload<byte[]>> context;
private @Nullable KeyValueStore<String, Boolean> firedStore;
private @Nullable Cancellable scheduledPunctuator;

ImpulseProcessor(String stateStoreName, String transformId) {
this.stateStoreName = stateStoreName;
this.transformId = transformId;
}

@Override
public void init(ProcessorContext<byte[], KStreamsPayload<byte[]>> context) {
this.context = context;
this.firedStore = context.getStateStore(stateStoreName);
this.scheduledPunctuator =
context.schedule(PUNCTUATION_DELAY, PunctuationType.WALL_CLOCK_TIME, ts -> maybeFire());
}

@Override
public void process(Record<byte[], byte[]> record) {
// Records that happen to land on the bootstrap topic are not actual data; they just provide an
// extra opportunity to fire the impulse on restart. The state store still gates the emit.
maybeFire();
}

private void maybeFire() {
ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx = context;
KeyValueStore<String, Boolean> store = firedStore;
if (ctx == null || store == null) {
return;
}
if (Boolean.TRUE.equals(store.get(FIRED_KEY))) {
// Data was already emitted in a previous task lifetime, but downstream watermark holds may
// still need to be released after the restart — re-emit the terminal watermark and stop the
// punctuator.
forwardWatermarkMax(ctx);
cancelPunctuator();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must also emit watermark with timestamp BoundedWindow.TIMESTAMP_MAX_VALUE. The reason for this is that if pipeline restarts we need to be able to recompute watermarks starting from sources (and associated watermark holds in downstream transforms).

return;
}
WindowedValue<byte[]> impulse = WindowedValues.valueInGlobalWindow(new byte[0]);
// The output PCollection is not keyed (PCollection<byte[]>); use an empty byte[] as a
// placeholder key so downstream processors that adopt the byte[]-key convention see a
// consistent shape.
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
new byte[0], KStreamsPayload.data(impulse), 0L));
forwardWatermarkMax(ctx);
store.put(FIRED_KEY, Boolean.TRUE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to emit the final watermark as well.

cancelPunctuator();
LOG.debug("Impulse {} emitted single element and terminal watermark", transformId);
}

/** Forwards a terminal {@code TIMESTAMP_MAX_VALUE} watermark payload to downstream processors. */
private static void forwardWatermarkMax(ProcessorContext<byte[], KStreamsPayload<byte[]>> ctx) {
long maxMillis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
ctx.forward(
new Record<byte[], KStreamsPayload<byte[]>>(
new byte[0], KStreamsPayload.watermark(maxMillis), 0L));
}

/** Cancels the wall-clock punctuator after the impulse has fired to stop periodic wakeups. */
private void cancelPunctuator() {
Cancellable handle = scheduledPunctuator;
if (handle != null) {
handle.cancel();
scheduledPunctuator = null;
}
}
Comment thread
junaiddshaukat marked this conversation as resolved.
}
Loading
Loading