Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
Expand Down Expand Up @@ -1310,6 +1311,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// Experiment marking that the harness supports tag encoding v2
// Backend will enable tag encoding v2 only if the harness supports it.
experiments.add("streaming_engine_state_tag_encoding_v2_supported");
// Experiment requesting tag encoding v2 on new jobs starting with 2.75.0. During job
// updates old job's tag encoding version is carried over by the backend.
if (!StreamingOptions.updateCompatibilityVersionLessThan(options, "2.75.0")) {
experiments.add("enable_streaming_engine_state_tag_encoding_v2");
}
options.setExperiments(ImmutableList.copyOf(experiments));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2923,4 +2923,48 @@ public void processElement(
PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE");
pipeline.run();
}

@Test
public void testStreamingStateTagEncodingV2PreCompatibility() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.74.0");
Pipeline p = Pipeline.create(options);

p.run();

List<String> experiments = options.getExperiments();
assertNotNull(experiments);
assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported"));
assertFalse(experiments.contains("enable_streaming_engine_state_tag_encoding_v2"));
}

@Test
public void testStreamingStateTagEncodingV2PostCompatibility() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.75.0");
Pipeline p = Pipeline.create(options);

p.run();

List<String> experiments = options.getExperiments();
assertNotNull(experiments);
assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported"));
assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2"));
}

@Test
public void testStreamingStateTagEncodingV2NoCompatibility() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
options.as(StreamingOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);

p.run();

List<String> experiments = options.getExperiments();
assertNotNull(experiments);
assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported"));
assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2"));
}
}
Loading