Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public void testReadRowsBuilderAndGetters() {
Map<String, String> hadoopConfig = new HashMap<>();
hadoopConfig.put("fs.defaultFS", "file:///");

ReadRows readRows = DeltaIO.readRows()
.from(tablePath)
.withVersion(version)
.withTimestamp(timestamp)
.withConfig(hadoopConfig);
ReadRows readRows =
DeltaIO.readRows()
.from(tablePath)
.withVersion(version)
.withTimestamp(timestamp)
.withConfig(hadoopConfig);

Assert.assertEquals(tablePath, readRows.getTablePath());
Assert.assertEquals(Long.valueOf(version), readRows.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
Expand All @@ -51,22 +53,31 @@
public class KafkaCommitOffset<K, V>
extends PTransform<
PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>>, PCollection<Void>> {

private final KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors;
private final boolean use259implementation;

KafkaCommitOffset(
KafkaIO.ReadSourceDescriptors<K, V> readSourceDescriptors, boolean use259implementation) {

this.readSourceDescriptors = readSourceDescriptors;
this.use259implementation = use259implementation;
}

static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void> {

private static final Logger LOG = LoggerFactory.getLogger(CommitOffsetDoFn.class);

private final Counter commitFailures =
Metrics.counter(CommitOffsetDoFn.class, "commit-failures");

private final Map<String, Object> consumerConfig;

private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn;

CommitOffsetDoFn(KafkaIO.ReadSourceDescriptors<?, ?> readSourceDescriptors) {

consumerConfig = readSourceDescriptors.getConsumerConfig();
consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
}
Expand All @@ -76,15 +87,23 @@ static class CommitOffsetDoFn extends DoFn<KV<KafkaSourceDescriptor, Long>, Void
@RequiresStableInput
@ProcessElement
public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {

Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, element.getKey());

try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {

try {

consumer.commitSync(
Collections.singletonMap(
element.getKey().getTopicPartition(),
new OffsetAndMetadata(element.getValue() + 1)));

} catch (Exception e) {

commitFailures.inc();

// TODO: consider retrying.
LOG.warn("Getting exception when committing offset: {}", e.getMessage());
}
Expand All @@ -93,29 +112,37 @@ public void processElement(@Element KV<KafkaSourceDescriptor, Long> element) {

private Map<String, Object> overrideBootstrapServersConfig(
Map<String, Object> currentConfig, KafkaSourceDescriptor description) {

checkState(
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
|| description.getBootStrapServers() != null);

Map<String, Object> config = new HashMap<>(currentConfig);

if (description.getBootStrapServers() != null
&& !description.getBootStrapServers().isEmpty()) {

config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(",", description.getBootStrapServers()));
}

Comment on lines +115 to +129
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Removing unnecessary blank lines to maintain consistency with the project's coding style.

      checkState(
          currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
              || description.getBootStrapServers() != null);
      Map<String, Object> config = new HashMap<>(currentConfig);
      if (description.getBootStrapServers() != null
          && !description.getBootStrapServers().isEmpty()) {
        config.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            String.join(",", description.getBootStrapServers()));
      }

return config;
}
}

private static final class MaxOffsetFn<K, V>
extends DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>, KV<KafkaSourceDescriptor, Long>> {

private static class OffsetAndTimestamp {

OffsetAndTimestamp(long offset, Instant timestamp) {
this.offset = offset;
this.timestamp = timestamp;
}

void merge(long offset, Instant timestamp) {

if (this.offset < offset) {
this.offset = offset;
this.timestamp = timestamp;
Expand All @@ -130,6 +157,7 @@ void merge(long offset, Instant timestamp) {

@StartBundle
public void startBundle() {

if (maxObserved == null) {
maxObserved = new HashMap<>();
} else {
Expand All @@ -143,13 +171,16 @@ public void startBundle() {
public void processElement(
@Element KV<KafkaSourceDescriptor, KafkaRecord<K, V>> element,
@Timestamp Instant timestamp) {

maxObserved.compute(
element.getKey(),
(k, v) -> {
long offset = element.getValue().getOffset();

if (v == null) {
return new OffsetAndTimestamp(offset, timestamp);
}

v.merge(offset, timestamp);
return v;
});
Expand All @@ -158,26 +189,33 @@ public void processElement(
@FinishBundle
@SuppressWarnings("nullness") // startBundle guaranteed to initialize
public void finishBundle(FinishBundleContext context) {

maxObserved.forEach(
(k, v) -> context.output(KV.of(k, v.offset), v.timestamp, GlobalWindow.INSTANCE));
}
}

@Override
public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> input) {

try {

PCollection<KV<KafkaSourceDescriptor, Long>> offsets;

if (use259implementation) {

offsets =
input.apply(
MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
.via(element -> KV.of(element.getKey(), element.getValue().getOffset())));

} else {

// Reduce the amount of data to combine by calculating a max within the generally dense
// bundles of reading
// from a Kafka partition.
// bundles of reading from a Kafka partition.
offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
}

Comment on lines +200 to +218
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Cleaning up excessive whitespace in the expand method to improve readability.

    try {
      PCollection<KV<KafkaSourceDescriptor, Long>> offsets;
      if (use259implementation) {
        offsets =
            input.apply(
                MapElements.into(new TypeDescriptor<KV<KafkaSourceDescriptor, Long>>() {})
                    .via(element -> KV.of(element.getKey(), element.getValue().getOffset())));
      } else {
        // Reduce the amount of data to combine by calculating a max within the generally dense
        // bundles of reading
        // from a Kafka partition.
        offsets = input.apply(ParDo.of(new MaxOffsetFn<>()));
      }

return offsets
.setCoder(
KvCoder.of(
Expand All @@ -190,7 +228,9 @@ public PCollection<Void> expand(PCollection<KV<KafkaSourceDescriptor, KafkaRecor
.apply(Max.longsPerKey())
.apply(ParDo.of(new CommitOffsetDoFn(readSourceDescriptors)))
.setCoder(VoidCoder.of());

} catch (NoSuchSchemaException e) {

throw new RuntimeException(e.getMessage());
}
}
Expand Down
Loading