From a4c4644030d9628805894e8d21c41bc7ffe9005a Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 29 Nov 2021 21:22:15 +0700 Subject: [PATCH 1/2] Add Akka.Streams.Kafka.Testkit to support other specs --- Akka.Streams.Kafka.sln | 6 + .../Akka.Streams.Kafka.Testkit.csproj | 25 ++ .../ConsumerResultFactory.cs | 2 +- .../Dsl/ConsumerControlFactory.cs | 55 ++++ .../Dsl/KafkaSpec.cs | 177 +++++++++++++ src/Akka.Streams.Kafka.Testkit/Extensions.cs | 91 +++++++ .../Fixture/ContainerInfo.cs | 18 ++ .../Fixture/FixtureBase.cs | 132 ++++++++++ .../Fixture/GenericContainer.cs | 175 +++++++++++++ .../Fixture/KafkaContainer.cs | 132 ++++++++++ .../Fixture/KafkaFixture.cs | 18 ++ .../Fixture/KafkaFixtureBase.cs | 64 +++++ .../Fixture/ZookeeperContainer.cs | 78 ++++++ .../Internal/Checks.cs | 80 ++++++ .../Internal/KafkaTestKit.cs | 175 +++++++++++++ .../KafkaTestkitSettings.cs | 31 +++ src/Akka.Streams.Kafka.Testkit/Logger.cs | 87 +++++++ .../MemberAssignment.cs | 54 ++++ .../Resources/reference.conf | 33 +++ src/Akka.Streams.Kafka.Testkit/Utils.cs | 25 ++ .../Akka.Streams.Kafka.Tests.csproj | 1 + .../BugFix240SupervisionStrategy.cs | 57 ++-- src/Akka.Streams.Kafka.Tests/Fixture.cs | 11 + .../AtMostOnceSourceIntegrationTests.cs | 13 +- ...ommitWithMetadataSourceIntegrationTests.cs | 5 +- ...ttablePartitionedSourceIntegrationTests.cs | 7 +- .../CommittableSourceIntegrationTests.cs | 19 +- .../CommitterFlowIntegrationTests.cs | 9 +- .../ExternalPlainSourceIntegrationTests.cs | 13 +- .../FlowWithContextIntegrationTests.cs | 11 +- ...ionedManualOffsetSourceIntegrationTests.cs | 19 +- .../PlainPartitionedSourceIntegrationTests.cs | 41 +-- .../Integration/PlainSinkIntegrationTests.cs | 21 +- .../PlainSourceIntegrationTests.cs | 35 +-- ...SourceWithOffsetContextIntegrationTests.cs | 5 +- .../TransactionalIntegrationTests.cs | 11 +- .../Internal/ConsumerSpec.cs | 87 +------ src/Akka.Streams.Kafka.Tests/KafkaFixture.cs | 246 ------------------ .../KafkaIntegrationTests.cs | 124 ++++++--- .../RepeatAttribute.cs | 34 +++ .../Messages/ProducerRecord.cs | 10 + .../Properties/FriendsOf.cs | 1 + .../Settings/ProducerSettings.cs | 82 ++++-- src/Akka.Streams.Kafka/reference.conf | 8 + 44 files changed, 1822 insertions(+), 506 deletions(-) create mode 100644 src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj rename src/{Akka.Streams.Kafka.Tests/TestKit/Internal => Akka.Streams.Kafka.Testkit}/ConsumerResultFactory.cs (96%) create mode 100644 src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Extensions.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/ContainerInfo.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/FixtureBase.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/GenericContainer.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixture.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixtureBase.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Fixture/ZookeeperContainer.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Internal/Checks.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Internal/KafkaTestKit.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/KafkaTestkitSettings.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Logger.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/MemberAssignment.cs create mode 100644 src/Akka.Streams.Kafka.Testkit/Resources/reference.conf create mode 100644 src/Akka.Streams.Kafka.Testkit/Utils.cs create mode 100644 src/Akka.Streams.Kafka.Tests/Fixture.cs delete mode 100644 src/Akka.Streams.Kafka.Tests/KafkaFixture.cs create mode 100644 src/Akka.Streams.Kafka.Tests/RepeatAttribute.cs diff --git a/Akka.Streams.Kafka.sln b/Akka.Streams.Kafka.sln index 369abdcd..36a51e6d 100644 --- a/Akka.Streams.Kafka.sln +++ b/Akka.Streams.Kafka.sln @@ -28,6 +28,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventHub.Consumer", "exampl EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Benchmark", "src\Akka.Streams.Kafka.Benchmark\Akka.Streams.Kafka.Benchmark.csproj", "{1BF52F4E-93FA-40C4-8985-C21D779C7997}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Kafka.Testkit", "src\Akka.Streams.Kafka.Testkit\Akka.Streams.Kafka.Testkit.csproj", "{49F2E094-BB98-41D8-9B3D-CF6557DCC420}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -62,6 +64,10 @@ Global {1BF52F4E-93FA-40C4-8985-C21D779C7997}.Debug|Any CPU.Build.0 = Debug|Any CPU {1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.ActiveCfg = Release|Any CPU {1BF52F4E-93FA-40C4-8985-C21D779C7997}.Release|Any CPU.Build.0 = Release|Any CPU + {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Debug|Any CPU.Build.0 = Debug|Any CPU + {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.ActiveCfg = Release|Any CPU + {49F2E094-BB98-41D8-9B3D-CF6557DCC420}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj b/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj new file mode 100644 index 00000000..27279a4f --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Akka.Streams.Kafka.Testkit.csproj @@ -0,0 +1,25 @@ + + + + + netstandard2.0 + 8.0 + + + + + + + + + + + + + + + + + + + diff --git a/src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs b/src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs similarity index 96% rename from src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs rename to src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs index ba31bdb7..9abfb916 100644 --- a/src/Akka.Streams.Kafka.Tests/TestKit/Internal/ConsumerResultFactory.cs +++ b/src/Akka.Streams.Kafka.Testkit/ConsumerResultFactory.cs @@ -3,7 +3,7 @@ using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Stages.Consumers; -namespace Akka.Streams.Kafka.Tests.TestKit.Internal +namespace Akka.Streams.Kafka.Testkit { public static class ConsumerResultFactory { diff --git a/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs b/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs new file mode 100644 index 00000000..ce9d61d4 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Dsl/ConsumerControlFactory.cs @@ -0,0 +1,55 @@ +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Helpers; + +namespace Akka.Streams.Kafka.Testkit.Dsl +{ + public static class ConsumerControlFactory + { + public static Source AttachControl(Source source) + => source.ViaMaterialized(ControlFlow(), Keep.Right); + + public static Flow ControlFlow() + => Flow.Create() + .ViaMaterialized(KillSwitches.Single(), Keep.Right) + .MapMaterializedValue(Control); + + public static IControl Control(IKillSwitch killSwitch) + => new FakeControl(killSwitch); + + public class FakeControl : IControl + { + private readonly IKillSwitch _killSwitch; + private readonly TaskCompletionSource _shutdownPromise; + + public FakeControl(IKillSwitch killSwitch) + { + _killSwitch = killSwitch; + _shutdownPromise = new TaskCompletionSource(); + } + + public Task Stop() + { + _killSwitch.Shutdown(); + _shutdownPromise.SetResult(Done.Instance); + return _shutdownPromise.Task; + } + + public Task Shutdown() + { + _killSwitch.Shutdown(); + _shutdownPromise.SetResult(Done.Instance); + return _shutdownPromise.Task; + } + + public Task IsShutdown => _shutdownPromise.Task; + + public Task DrainAndShutdown(Task streamCompletion) + { + _killSwitch.Shutdown(); + _shutdownPromise.SetResult(Done.Instance); + return Task.FromResult(default(TResult)); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs b/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs new file mode 100644 index 00000000..e5ebdafc --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Dsl/KafkaSpec.cs @@ -0,0 +1,177 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor.Setup; +using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Dsl; +using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Internal; +using Akka.Streams.TestKit; +using Akka.Util; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Xunit; +using Xunit.Abstractions; +using Config = Akka.Configuration.Config; + +namespace Akka.Streams.Kafka.Testkit.Dsl +{ + public abstract class KafkaSpec : KafkaTestKit, IAsyncLifetime + { + protected KafkaSpec(string config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) + { + } + + protected KafkaSpec(Config config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) + { + } + + protected KafkaSpec(ActorSystemSetup config, string actorSystemName = null, ITestOutputHelper output = null) : base(config, actorSystemName, output) + { + } + + protected IProducer TestProducer { get; private set; } + + + public virtual Task InitializeAsync() + { + TestProducer = ProducerDefaults().CreateKafkaProducer(); + SetUpAdminClient(); + return Task.CompletedTask; + } + + public virtual Task DisposeAsync() + { + TestProducer?.Dispose(); + CleanUpAdminClient(); + Shutdown(); + return Task.CompletedTask; + } + + protected void Sleep(TimeSpan time, string msg) + { + Log.Debug($"Sleeping {time}: {msg}"); + Thread.Sleep(time); + } + + protected List AwaitMultiple(TimeSpan timeout, IEnumerable> tasks) + { + var completedTasks = new List>(); + using (var cts = new CancellationTokenSource(timeout)) + { + var waitingTasks = tasks.ToList(); + while (waitingTasks.Count > 0) + { + var anyTask = Task.WhenAny(waitingTasks); + try + { + anyTask.Wait(cts.Token); + } + catch (Exception e) + { + throw new Exception($"AwaitMultiple failed. Exception: {e.Message}", e); + } + + var completedTask = anyTask.Result; + waitingTasks.Remove(completedTask); + completedTasks.Add(completedTask); + } + } + + return completedTasks.Select(t => t.Result).ToList(); + } + + protected TimeSpan SleepAfterProduce => TimeSpan.FromSeconds(4); + + protected void AwaitProduce(IEnumerable> tasks) + { + AwaitMultiple(TimeSpan.FromSeconds(4), tasks); + Sleep(SleepAfterProduce, "to be sure producing has happened"); + } + + protected readonly Partition Partition0 = new Partition(0); + + // Not implemented + [Obsolete("Kafka DescribeCluster API isn't supported by the .NET driver")] + protected void WaitUntilCluster(Func predicate) + => Checks.WaitUntilCluster(Settings.ClusterTimeout, Settings.CheckInterval, AdminClient, predicate, Log); + + protected void WaitUntilConsumerGroup(string groupId, Func predicate) + => Checks.WaitUntilConsumerGroup( + groupId: groupId, + timeout: Settings.ConsumerGroupTimeout, + sleepInBetween: Settings.CheckInterval, + adminClient: AdminClient, + predicate: predicate, + log: Log); + + protected void WaitUntilConsumerSummary(string groupId, Func, bool> predicate) + => WaitUntilConsumerGroup(groupId, info => + { + return info.State == "Stable" && Try.From(() => predicate(info.Members)).OrElse(false).Success.Value; + }); + + protected ImmutableList CreateTopics(IEnumerable topics) + => CreateTopicsAsync(topics).Result; + + protected async Task> CreateTopicsAsync(IEnumerable topics) + { + var topicNames = topics.Select(CreateTopicName).ToImmutableList(); + var configs = new Dictionary(); + var newTopics = topicNames.Select(topic => + new TopicSpecification + { + Name = topic, + NumPartitions = 1, + ReplicationFactor = 1, + Configs = configs + }); + await AdminClient.CreateTopicsAsync( + topics: newTopics, + options: new CreateTopicsOptions {RequestTimeout = TimeSpan.FromSeconds(10)}); + return topicNames; + } + + protected void PeriodicalCheck(string description, int maxTries, TimeSpan sleepInBetween, Func data, Func predicate) + => Checks.PeriodicalCheck(description, new TimeSpan(sleepInBetween.Ticks * maxTries), sleepInBetween, data, predicate, Log); + + /// + /// Produce messages to topic using specified range and return a Future so the caller can synchronize consumption. + /// + protected Task Produce(string topic, IEnumerable range, int? partition = null) + => ProduceString(topic, range.Select(i => i.ToString()), partition); + + protected Task ProduceString(string topic, IEnumerable range, int? partition = null) + { + partition ??= Partition0; + return Source.From(range) + // NOTE: If no partition is specified but a key is present a partition will be chosen + // using a hash of the key. If neither key nor partition is present a partition + // will be assigned in a round-robin fashion. + .Select(n => new ProducerRecord(topic, partition, DefaultKey, n)) + .RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer()); + } + + protected Task ProduceTimestamped(string topic, IEnumerable<(int, long)> timestampedRange) + => Source.From(timestampedRange) + .Select( tuple => + { + var (n, ts) = tuple; + return new ProducerRecord(topic, Partition0, ts, DefaultKey, n.ToString()); + }) + .RunWith(KafkaProducer.PlainSink(ProducerDefaults().WithProducer(TestProducer)), Sys.Materializer()); + + protected (IControl, TestSubscriber.Probe) CreateProbe( + ConsumerSettings consumerSettings, + string[] topics) + => KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topics)) + .Select(s => s.Message.Value) + .ToMaterialized(this.SinkProbe(), Keep.Both) + .Run(Sys.Materializer()); + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Extensions.cs b/src/Akka.Streams.Kafka.Testkit/Extensions.cs new file mode 100644 index 00000000..ade34412 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Extensions.cs @@ -0,0 +1,91 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Streams.Implementation; +using Akka.Util.Internal; + +namespace Akka.Streams.Kafka.Testkit +{ + public static class Extensions + { + public static async Task WithTimeout(this Task task, TimeSpan timeout) + { + using (var cts = new CancellationTokenSource()) + { + var timeoutTask = Task.Delay(timeout, cts.Token); + var completed = await Task.WhenAny(task, timeoutTask); + if (completed == timeoutTask) + throw new OperationCanceledException("Operation timed out"); + else + cts.Cancel(); + } + } + + public static List> Grouped(this IEnumerable messages, int size) + { + var groups = new List>(); + var list = new List(); + var index = 0; + foreach (var message in messages) + { + list.Add(message); + if(index != 0 && index % size == 0) + { + groups.Add(list); + list = new List(); + } + + index++; + } + if(list.Count > 0) + groups.Add(list); + return groups; + } + + public static void AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Action block, IMaterializer materializer) + { + AssertAllStagesStopped(spec, () => + { + block(); + return NotUsed.Instance; + }, materializer); + } + + public static T AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Func block, IMaterializer materializer) + { + if (!(materializer is ActorMaterializerImpl impl)) + return block(); + + var probe = spec.CreateTestProbe(impl.System); + probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); + probe.ExpectMsg(); + var result = block(); + + probe.Within(TimeSpan.FromSeconds(5), () => + { + IImmutableSet children = ImmutableHashSet.Empty; + try + { + probe.AwaitAssert(() => + { + impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref); + children = probe.ExpectMsg().Refs; + if (children.Count != 0) + throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}"); + }); + } + catch + { + children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance)); + throw; + } + }); + + return result; + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/ContainerInfo.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/ContainerInfo.cs new file mode 100644 index 00000000..dc5a27ca --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/ContainerInfo.cs @@ -0,0 +1,18 @@ +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public sealed class ContainerInfo + { + public ContainerInfo(string imageName, string imageTag, string containerBaseName) + { + ImageName = imageName; + ImageTag = imageTag; + ContainerBaseName = containerBaseName; + + } + + public string ImageName { get; } + public string ImageTag { get; } + public string ContainerBaseName { get; } + public string DockerName => $"{ImageName}:{ImageTag}"; + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/FixtureBase.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/FixtureBase.cs new file mode 100644 index 00000000..fea93a56 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/FixtureBase.cs @@ -0,0 +1,132 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Akka.Event; +using Docker.DotNet; +using Docker.DotNet.Models; +using Xunit; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public abstract class FixtureBase: IAsyncLifetime + { + private readonly DockerClient _client; + public bool Initialized { get; private set; } + + public ImmutableList Containers { get; private set; } = ImmutableList.Empty; + public virtual string NetworkName { get; private set; } + public string NetworkId { get; private set; } + protected ILoggingAdapter Log { get; } + + protected FixtureBase(ILoggingAdapter log) + { + Log = log; + + DockerClientConfiguration config; + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + config = new DockerClientConfiguration(new Uri("unix://var/run/docker.sock")); + else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + config = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")); + else + throw new NotSupportedException($"Unsupported OS [{RuntimeInformation.OSDescription}]"); + + _client = config.CreateClient(); + } + + public async Task InitializeAsync() + { + await Initialize(); + Initialized = true; + if (Containers.Count == 0) + throw new InvalidOperationException("No containers are registered in Initialize()"); + + // Wait until all containers are created, in parallel + await Task.WhenAll(Containers.Select(c => + { + if(!c.Initialized) + c.Initialize(_client, Log); + return c.CreateAsync(); + })); + + // Create network and wait until all containers are connected, in parallel + if (!string.IsNullOrEmpty(NetworkName)) + { + var response = await _client.Networks.CreateNetworkAsync( + new NetworksCreateParameters + { + Name = NetworkName, + EnableIPv6 = false + }); + NetworkId = response.ID; + if(!string.IsNullOrEmpty(response.Warning)) + Log.Warning(response.Warning); + + await Task.WhenAll(Containers.Select(c => _client.Networks.ConnectNetworkAsync( + id: NetworkId, + parameters: new NetworkConnectParameters { Container = c.ContainerName }))); + } + + // Start all containers, in parallel if they're async + var tasks = new List(); + foreach (var container in Containers) + { + if (container is GenericContainerAsync ca) + { + tasks.Add(ca.StartAsync()); + } + else + { + await Task.WhenAll(tasks); + tasks.Clear(); + container.Start(); + } + } + await Task.WhenAll(tasks); + } + + public async Task DisposeAsync() + { + if (_client != null) + { + await ResourceCleanupAsync(); + _client.Dispose(); + } + } + + protected abstract Task Initialize(); + protected void RegisterContainer(GenericContainer container) + { + if (Initialized) + throw new InvalidOperationException("RegisterContainer() can only be called inside Initialize()"); + Containers = Containers.Add(container); + } + + protected void CreateNetwork(string networkName) + { + if (Initialized) + throw new InvalidOperationException("CreateNetwork() can only be called inside Initialize()"); + NetworkName = networkName; + } + + /// + /// This performs cleanup of allocated containers/networks during tests + /// + /// + private async Task ResourceCleanupAsync() + { + if (_client == null) + return; + + await Task.WhenAll(Containers + .Where(c => c.IsCreated) + .Select(c => c.RemoveAsync())); + + await _client.Networks.DeleteNetworkAsync(NetworkId); + } + + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/GenericContainer.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/GenericContainer.cs new file mode 100644 index 00000000..9a699bba --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/GenericContainer.cs @@ -0,0 +1,175 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.Event; +using Docker.DotNet; +using Docker.DotNet.Models; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public abstract class GenericContainer + { + private static readonly ContainerStopParameters StopParam = new ContainerStopParameters + { + WaitBeforeKillSeconds = 1 + }; + + private static readonly ContainerRemoveParameters RemoveParam = new ContainerRemoveParameters + { + Force = true + }; + + private ILoggingAdapter _log; + protected DockerClient Client { get; private set; } + + public ContainerInfo Info { get; } + public string ContainerName { get; } + public string ContainerId { get; private set; } + public bool IsCreated { get; private set; } + public bool IsStarted { get; private set; } + public bool Initialized => Client != null; + + protected GenericContainer(ContainerInfo info) + { + Info = info; + ContainerName = $"{info.ContainerBaseName}-{Guid.NewGuid()}"; + } + + protected abstract Task SetupContainerParameters(CreateContainerParameters param); + protected abstract Task WaitUntilReady(CancellationToken token); + + internal void Initialize(DockerClient client, ILoggingAdapter log) + { + if (Initialized) + throw new Exception("Container already initialized"); + + Client = client; + _log = log; + } + + internal void Start() + { + StartAsync().Wait(); + } + + internal async Task StartAsync() + { + if (!Initialized) + throw new InvalidOperationException("Container has not been initialized."); + + _log.Info($"Starting container [{ContainerId}]"); + + var success = await Client.Containers.StartContainerAsync(ContainerName, new ContainerStartParameters()); + if (!success) + { + throw new Exception($"Failed to start container {ContainerName}"); + } + + IsStarted = true; + _log.Info($"Waiting for container [{ContainerId}] to be ready"); + using (var cts = new CancellationTokenSource()) + { + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(10), cts.Token); + var waitTask = WaitUntilReady(cts.Token); + var completed = Task.WhenAny(timeoutTask, waitTask); + if (completed == timeoutTask) + throw new Exception($"Timed out waiting for container {ContainerId} to start. (10 seconds)"); + cts.Cancel(); + } + } + + internal async Task CreateAsync() + { + if (!Initialized) + throw new InvalidOperationException("Container has not been initialized."); + + _log.Info($"Creating container [{ContainerName}]"); + // Load images, if they not exist yet + await EnsureImageExistsAsync(); + var parameters = new CreateContainerParameters + { + Image = Info.DockerName, + Name = ContainerName, + Tty = true, + }; + await SetupContainerParameters(parameters); + var response = await Client.Containers.CreateContainerAsync(parameters); + ContainerId = response.ID; + foreach (var warning in response.Warnings) + { + _log?.Warning(warning); + } + + IsCreated = true; + } + + public async Task StopAsync() + { + if (!Initialized) + throw new InvalidOperationException("Container has not been initialized."); + + if (!IsCreated) return; + if (!IsStarted) return; + _log.Info($"Stopping container [{ContainerId}]"); + await Client.Containers.StopContainerAsync(ContainerId, StopParam); + IsStarted = false; + } + + public async Task RemoveAsync() + { + if (!Initialized) + throw new InvalidOperationException("Container has not been initialized."); + + if (!IsCreated) return; + + if (IsStarted) + await StopAsync(); + _log.Info($"Removing container [{ContainerId}]"); + await Client.Containers.RemoveContainerAsync(ContainerId, RemoveParam); + IsCreated = false; + } + + private async Task EnsureImageExistsAsync() + { + var existingImages = await Client.Images.ListImagesAsync( + new ImagesListParameters + { + Filters = new Dictionary> + { + { + "reference", + new Dictionary + { + {Info.DockerName, true} + } + } + } + }); + + if (existingImages.Count == 0) + { + await Client.Images.CreateImageAsync( + parameters: new ImagesCreateParameters + { + FromImage = Info.ImageName, Tag = Info.ImageTag + }, + authConfig: null, + progress: new Progress(message => + { + if(!string.IsNullOrEmpty(message.ErrorMessage)) + _log.Error(message.ErrorMessage); + else + _log.Info($"[{message.ID}][{message.Status}] {message.ProgressMessage}"); + })); + } + } + } + + public abstract class GenericContainerAsync : GenericContainer + { + protected GenericContainerAsync(ContainerInfo info) : base(info) + { + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs new file mode 100644 index 00000000..0b1f07ec --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs @@ -0,0 +1,132 @@ +using System.Collections.Generic; +using System.IO; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Docker.DotNet.Models; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public class KafkaContainer : GenericContainerAsync + { + private static readonly ContainerInfo ContainerInfo = new ContainerInfo( + imageName: "confluentinc/cp-kafka", + imageTag: "5.3.0", + containerBaseName: "akka.net-kafka-test"); + + private readonly int _id; + private readonly int _port; + private readonly int _partitions; + private readonly int _replicationFactor; + private readonly string _zookeeperName; + private readonly int _zookeeperPort; + + public KafkaContainer(int id, int port, int partitions, int replicationFactor, string zookeeperName, int zookeeperPort) : base(ContainerInfo) + { + _id = id; + _port = port; + _partitions = partitions; + _zookeeperName = zookeeperName; + _zookeeperPort = zookeeperPort; + _replicationFactor = replicationFactor; + } + + protected override Task SetupContainerParameters(CreateContainerParameters param) + { + param.ExposedPorts = new Dictionary + { + {$"{_port}/tcp", new EmptyStruct()}, + }; + param.HostConfig = new HostConfig + { + PortBindings = new Dictionary> + { + { + $"{_port}/tcp", + new List + { + new PortBinding + { + HostPort = _port.ToString() + } + } + }, + { + "29092/tcp", + new List + { + new PortBinding + { + HostPort = "29092" + } + } + } + }, + ExtraHosts = new[] {"localhost:127.0.0.1"}, + }; + + var env = new List + { + $"KAFKA_ZOOKEEPER_CONNECT={_zookeeperName}:{_zookeeperPort}", // referencing zookeeper container directly in common docker network + + $"KAFKA_LISTENERS=BROKER://0.0.0.0:29092,PLAINTEXT://0.0.0.0:{_port}", + $"KAFKA_ADVERTISED_LISTENERS=BROKER://{ContainerName}:29092,PLAINTEXT://127.0.0.1:{_port}", + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT", + "KAFKA_INTER_BROKER_LISTENER_NAME=BROKER", + + $"KAFKA_BROKER_ID={_id}", + $"KAFKA_NUM_PARTITIONS={_partitions}", + $"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={_replicationFactor}", + $"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS={_replicationFactor}", + "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true", + "KAFKA_DELETE_TOPIC_ENABLE=true", + "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0", + "KAFKA_OPTS=-Djava.net.preferIPv4Stack=True" + }; + + param.Env = env; + + return Task.CompletedTask; + } + + protected override async Task WaitUntilReady(CancellationToken token) + { + var regex = new Regex("\\[KafkaServer id=[0-9]*\\] started \\(kafka.server.KafkaServer\\)"); + using (var stream = await Client.Containers.GetContainerLogsAsync(ContainerId, new ContainerLogsParameters + { + ShowStdout = true, + ShowStderr = true, + }, token)) + { + using (var reader = new StreamReader(stream)) + { + var ready = false; + while (!ready) + { + var line = await reader.ReadLineAsync(); + if(!string.IsNullOrEmpty(line)) + ready = regex.IsMatch(line); + } + } + } + /* + var address = IPAddress.Parse("127.0.0.1"); + using (var socket = new TcpClient(AddressFamily.InterNetwork)) + { + var connected = false; + while (!connected && !token.IsCancellationRequested) + { + try + { + await socket.ConnectAsync(address, _port); + connected = socket.Connected; + } + catch (SocketException) { } + if (!connected) + await Task.Delay(100, token); + } + } + */ + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixture.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixture.cs new file mode 100644 index 00000000..77e94cdc --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixture.cs @@ -0,0 +1,18 @@ +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + [CollectionDefinition(Name)] + public sealed class KafkaSpecsFixture : ICollectionFixture + { + public const string Name = "KafkaSpecs"; + } + + public class KafkaFixture : KafkaFixtureBase + { + public KafkaFixture(IMessageSink sink) : base(1, 3, 1, new MessageSinkLogger(sink)) + { + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixtureBase.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixtureBase.cs new file mode 100644 index 00000000..4b98dbd0 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaFixtureBase.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Akka.Event; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public class KafkaFixtureBase: FixtureBase + { + private readonly int _brokerCount; + public int PartitionCount { get; } + public int ReplicationFactor { get; } + + public ImmutableList Brokers { get; protected set; } = ImmutableList.Empty; + + public KafkaFixtureBase(int brokerCount, int partitionCount, int replicationFactor, ILoggingAdapter log) : base(log) + { + _brokerCount = brokerCount; + PartitionCount = partitionCount; + ReplicationFactor = replicationFactor; + + KafkaPorts = new int[brokerCount]; + for (var i = 0; i < brokerCount; ++i) + { + KafkaPorts[i] = GetTemporarySocketPort(); + } + + ZookeeperPort = GetTemporarySocketPort(); + } + + public int[] KafkaPorts { get; } + public int ZookeeperPort { get; } + public string BootstrapServer => string.Join(",", KafkaPorts.Take(Math.Min(_brokerCount, 3)).Select(port => $"127.0.0.1:{port}")); + + protected override Task Initialize() + { + CreateNetwork($"akka-kafka-network-{Guid.NewGuid()}"); + + var zookeeper = new ZookeeperContainer(ZookeeperPort); + RegisterContainer(zookeeper); + + for (var i = 0; i < _brokerCount; ++i) + { + var kafka = new KafkaContainer(i+1, KafkaPorts[i], PartitionCount, ReplicationFactor, zookeeper.ContainerName, ZookeeperPort); + Brokers = Brokers.Add(kafka); + RegisterContainer(kafka); + } + + return Task.CompletedTask; + } + + private int GetTemporarySocketPort() + { + using (var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + sock.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 0)); + return ((IPEndPoint) sock.LocalEndPoint).Port; + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/ZookeeperContainer.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/ZookeeperContainer.cs new file mode 100644 index 00000000..58b77951 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/ZookeeperContainer.cs @@ -0,0 +1,78 @@ +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Docker.DotNet.Models; + +namespace Akka.Streams.Kafka.Testkit.Fixture +{ + public class ZookeeperContainer: GenericContainerAsync + { + private static readonly ContainerInfo ContainerInfo = new ContainerInfo( + imageName: "confluentinc/cp-zookeeper", + imageTag: "5.3.0", + containerBaseName: "akka.net-zookeeper-test"); + + private int _port; + + public ZookeeperContainer(int port) : base(ContainerInfo) + { + _port = port; + } + + protected override Task SetupContainerParameters(CreateContainerParameters param) + { + param.ExposedPorts = new Dictionary + { + {$"{_port}/tcp", new EmptyStruct()} + }; + param.HostConfig = new HostConfig + { + PortBindings = new Dictionary> + { + { + $"{_port}/tcp", + new List + { + new PortBinding + { + HostPort = _port.ToString() + } + } + } + }, + ExtraHosts = new[] {"localhost:127.0.0.1"}, + }; + param.Env = new[] + { + $"ZOOKEEPER_CLIENT_PORT={_port}", + "ZOOKEEPER_TICK_TIME=2000", + }; + + return Task.CompletedTask; + } + + protected override async Task WaitUntilReady(CancellationToken token) + { + var address = IPAddress.Parse("127.0.0.1"); + using (var socket = new TcpClient(AddressFamily.InterNetwork)) + { + var connected = false; + while (!connected && !token.IsCancellationRequested) + { + try + { + await socket.ConnectAsync(address, _port); + connected = socket.Connected; + } + catch (SocketException) { } + if (!connected) + await Task.Delay(100, token); + } + } + + await Task.Delay(1000, token); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Internal/Checks.cs b/src/Akka.Streams.Kafka.Testkit/Internal/Checks.cs new file mode 100644 index 00000000..dfe868ba --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Internal/Checks.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading; +using Akka.Event; +using Akka.Util; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Testkit.Internal +{ + public static class Checks + { + [Obsolete("Kafka DescribeCluster API is not supported in the .NET driver")] + public static void WaitUntilCluster( + TimeSpan timeout, + TimeSpan sleepInBetween, + IAdminClient adminClient, + Func predicate, + ILoggingAdapter log) + { + throw new NotImplementedException("Kafka DescribeCluster API is not supported in the .NET driver"); + } + + public static void WaitUntilConsumerGroup( + string groupId, + TimeSpan timeout, + TimeSpan sleepInBetween, + IAdminClient adminClient, + Func predicate, + ILoggingAdapter log) + { + PeriodicalCheck( + description: "consumer group state", + timeout: timeout, + sleepInBetween: sleepInBetween, + data: () => adminClient.ListGroup(groupId, timeout), + predicate: predicate, + log: log); + } + + public static void PeriodicalCheck( + string description, + TimeSpan timeout, + TimeSpan sleepInBetween, + Func data, + Func predicate, + ILoggingAdapter log) + { + var maxTries = (int) (timeout.TotalMilliseconds / sleepInBetween.TotalMilliseconds); + var triesLeft = maxTries; + + while (true) + { + var result = Try + .From(() => predicate(data())) + .RecoverWith(ex => + { + log.Debug($"Ignoring [{ex.GetType()}: {ex.Message}] while waiting for desired state"); + return false; + }); + + if (result.Success == false) + { + if (triesLeft > 0) + { + Thread.Sleep(sleepInBetween); + triesLeft--; + continue; + } + throw new TimeoutException( + $"Timeout while waiting for desired {description}. Tried [{maxTries}] times, slept [{sleepInBetween}] in between."); + } + + if (!result.IsSuccess) + throw result.Failure.Value; + + if(result.Success == true) + break; + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Internal/KafkaTestKit.cs b/src/Akka.Streams.Kafka.Testkit/Internal/KafkaTestKit.cs new file mode 100644 index 00000000..57acbfb0 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Internal/KafkaTestKit.cs @@ -0,0 +1,175 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Actor.Setup; +using Akka.Configuration; +using Akka.Streams.Kafka.Settings; +using Akka.Util.Internal; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Xunit.Abstractions; +using Config = Akka.Configuration.Config; + +namespace Akka.Streams.Kafka.Testkit.Internal +{ + public abstract class KafkaTestKit : Akka.TestKit.Xunit2.TestKit + { + public new static Config DefaultConfig { get; } = + ConfigurationFactory.FromResource("Akka.Streams.Kafka.Testkit.Resources.reference.conf"); + + private static readonly AtomicCounter TopicCounter = new AtomicCounter(); + + public const string DefaultKey = "key"; + + protected readonly ISerializer StringSerializer = Serializers.Utf8; + protected readonly IDeserializer StringDeserializer = Deserializers.Utf8; + + protected ProducerSettings ProducerDefaults() + => ProducerDefaults(StringSerializer, StringSerializer); + + protected ProducerSettings ProducerDefaults( + ISerializer keySerializer, + ISerializer valueSerializer) + => ProducerSettings.Create(Sys, keySerializer, valueSerializer) + .WithBootstrapServers(BootstrapServers); + + protected ConsumerSettings ConsumerDefaults() + => ConsumerDefaults(StringDeserializer, StringDeserializer); + + protected ConsumerSettings ConsumerDefaults( + IDeserializer keyDeserializer, + IDeserializer valueDeserializer) + => ConsumerSettings.Create(Sys, keyDeserializer, valueDeserializer) + .WithBootstrapServers(BootstrapServers) + .WithProperty("auto.offset.reset", "earliest"); + + private readonly Lazy _committerDefaultsInstance; + protected CommitterSettings CommitterDefaults => _committerDefaultsInstance.Value; + + private int NextNumber => TopicCounter.IncrementAndGet(); + + private string Uuid { get; } = Guid.NewGuid().ToString(); + + /// + /// Return a unique topic name + /// + /// + /// + protected string CreateTopicName(int suffix) => $"topic-{suffix}-{Uuid}"; + + /// + /// Return a unique group id with a given suffix + /// + /// + /// + protected string CreateGroupId(int suffix) => $"group-{suffix}-{Uuid}"; + + protected string CreateTransactionalId(int suffix) => $"transactionalId-{suffix}-{Uuid}"; + + protected KafkaTestkitSettings Settings { get; } + + private readonly Lazy> _adminDefaults; + private IAdminClient _adminClient; + protected IAdminClient AdminClient + { + get + { + if (_adminClient == null) + throw new Exception( + "admin client not created, be sure to call setupAdminClient() and cleanupAdminClient()"); + return _adminClient; + } + } + + protected void SetUpAdminClient() + { + if (_adminClient == null) + _adminClient = new AdminClientBuilder(_adminDefaults.Value).Build(); + } + + protected void CleanUpAdminClient() + { + _adminClient?.Dispose(); + _adminClient = null; + } + + protected Task CreateTopic() + => CreateTopic(0, 1, 1, new Dictionary(), new CreateTopicsOptions()); + + protected Task CreateTopic(int suffix) + => CreateTopic(suffix, 1, 1, new Dictionary(), new CreateTopicsOptions()); + + protected Task CreateTopic(int suffix, int partitions) + => CreateTopic(suffix, partitions, 1, new Dictionary(), new CreateTopicsOptions()); + protected Task CreateTopic(int suffix, int partitions, int replication) + => CreateTopic(suffix, partitions, replication, new Dictionary(), new CreateTopicsOptions()); + + protected Task CreateTopic(int suffix, int partitions, int replication, Dictionary config) + => CreateTopic(suffix, partitions, replication, config, new CreateTopicsOptions()); + + protected async Task CreateTopic(int suffix, int partitions, int replication, Dictionary config, CreateTopicsOptions options) + { + var topicName = CreateTopicName(suffix); + await _adminClient.CreateTopicsAsync(new[] {new TopicSpecification + { + Name = topicName, + NumPartitions = partitions, + ReplicationFactor = (short)replication, + Configs = config + }}, options); + + return topicName; + } + + protected void SleepMillis(int ms, string msg) + { + Log.Debug($"Sleeping {ms} ms: {msg}"); + Thread.Sleep(ms); + } + + protected void SleepSeconds(int s, string msg) + { + Log.Debug($"Sleeping {s} s: {msg}"); + Thread.Sleep(s * 1000); + } + + protected abstract string BootstrapServers { get; } + + + private static Config Config() + { + //var config = ConfigurationFactory.ParseString("akka.loglevel = DEBUG"); + return ConfigurationFactory.ParseString("akka{}") + .WithFallback(DefaultConfig) + .WithFallback(KafkaExtensions.DefaultSettings); + } + + protected KafkaTestKit(string config, string actorSystemName = null, ITestOutputHelper output = null) + : this(config != null ? ConfigurationFactory.ParseString(config) : null, + actorSystemName, + output) + { } + + protected KafkaTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null) + : this( + config != null ? ActorSystemSetup.Empty.WithSetup(BootstrapSetup.Create().WithConfig(config)) : null, + actorSystemName, + output) + { } + + protected KafkaTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null) + : base(setup ?? ActorSystemSetup.Empty, actorSystemName, output) + { + Sys.Settings.InjectTopLevelFallback(Config()); + Settings = new KafkaTestkitSettings(Sys); + _committerDefaultsInstance = new Lazy(() => CommitterSettings.Create(Sys)); + _adminDefaults = new Lazy>(() => new Dictionary + { + ["bootstrap.servers"] = BootstrapServers + }); + } + + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/KafkaTestkitSettings.cs b/src/Akka.Streams.Kafka.Testkit/KafkaTestkitSettings.cs new file mode 100644 index 00000000..d1efafc0 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/KafkaTestkitSettings.cs @@ -0,0 +1,31 @@ +using System; +using Akka.Actor; +using Akka.Configuration; + +namespace Akka.Streams.Kafka.Testkit +{ + public class KafkaTestkitSettings + { + public const string ConfigPath = "akka.kafka.testkit"; + + public KafkaTestkitSettings(ActorSystem system) : this(system.Settings.Config.GetConfig(ConfigPath)) + { } + + public KafkaTestkitSettings(Config config): this( + config.GetTimeSpan("cluster-timeout"), + config.GetTimeSpan("consumer-group-timeout"), + config.GetTimeSpan("check-interval")) + { } + + public KafkaTestkitSettings(TimeSpan clusterTimeout, TimeSpan consumerGroupTimeout, TimeSpan checkInterval) + { + ClusterTimeout = clusterTimeout; + ConsumerGroupTimeout = consumerGroupTimeout; + CheckInterval = checkInterval; + } + + public TimeSpan ClusterTimeout { get; } + public TimeSpan ConsumerGroupTimeout { get; } + public TimeSpan CheckInterval { get; } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Logger.cs b/src/Akka.Streams.Kafka.Testkit/Logger.cs new file mode 100644 index 00000000..c3688e83 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Logger.cs @@ -0,0 +1,87 @@ +using System; +using System.Diagnostics; +using Akka.Event; +using Xunit.Abstractions; +using Xunit.Sdk; + +namespace Akka.Streams.Kafka.Testkit +{ + public class MessageSinkLogger : ILoggingAdapter, IDisposable + { + private readonly Stopwatch _watch; + private readonly IMessageSink _sink; + + public MessageSinkLogger(IMessageSink sink) + { + _watch = Stopwatch.StartNew(); + _sink = sink; + } + + public void Debug(string message) + => Log(LogLevel.DebugLevel, message); + + public void Debug(string format, params object[] args) + => Log(LogLevel.DebugLevel, format, args); + + public void Debug(Exception cause, string format, params object[] args) + => Log(LogLevel.DebugLevel, cause, format, args); + + public void Info(string message) + => Log(LogLevel.InfoLevel, message); + + public void Info(string format, params object[] args) + => Log(LogLevel.InfoLevel, format, args); + + public void Info(Exception cause, string format, params object[] args) + => Log(LogLevel.InfoLevel, cause, format, args); + + public void Warning(string message) + => Log(LogLevel.WarningLevel, message); + + public void Warning(string format, params object[] args) + => Log(LogLevel.WarningLevel, format, args); + + public void Warning(Exception cause, string format, params object[] args) + => Log(LogLevel.WarningLevel, cause, format, args); + + public void Error(string format, params object[] args) + => Log(LogLevel.ErrorLevel, format, args); + + public void Error(Exception cause, string format, params object[] args) + => Log(LogLevel.ErrorLevel, cause, format, args); + + public void Error(string message) + => Log(LogLevel.ErrorLevel, message); + + public void Dispose() + { + _watch.Stop(); + } + + public void Log(LogLevel logLevel, string format, params object[] args) + { + _sink?.OnMessage(new DiagnosticMessage($"[XunitFixture][{ToDesc(logLevel)}][{_watch.Elapsed}] {string.Format(format, args)}")); + } + + public void Log(LogLevel logLevel, Exception cause, string format, params object[] args) + { + _sink?.OnMessage(new DiagnosticMessage($"[XunitFixture][{ToDesc(logLevel)}][{_watch.Elapsed}] {string.Format(format, args)}: {cause.Message}:\n{cause.StackTrace}")); + } + + private string ToDesc(LogLevel level) + => level switch + { + LogLevel.DebugLevel => "DBG", + LogLevel.InfoLevel => "INF", + LogLevel.WarningLevel => "WRN", + LogLevel.ErrorLevel => "ERR", + _ => throw new IndexOutOfRangeException($"Unknown LogLevel: [{level}]") + }; + + public bool IsEnabled(LogLevel logLevel) => _sink != null; + public bool IsDebugEnabled => IsEnabled(LogLevel.DebugLevel); + public bool IsInfoEnabled => IsEnabled(LogLevel.InfoLevel); + public bool IsWarningEnabled => IsEnabled(LogLevel.WarningLevel); + public bool IsErrorEnabled => IsEnabled(LogLevel.ErrorLevel); + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/MemberAssignment.cs b/src/Akka.Streams.Kafka.Testkit/MemberAssignment.cs new file mode 100644 index 00000000..64f0b60a --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/MemberAssignment.cs @@ -0,0 +1,54 @@ +using System.Collections.Generic; +using System.Collections.Immutable; +using System.IO; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Testkit +{ + // TODO: Need testing, check if little endian/big endianness is correct + /// + /// Specialized deserializer for MemberAssignment. + /// C# AdminClient returns raw bytes that needs to be deserialized into a proper object + /// + public class MemberAssignment + { + public ImmutableHashSet TopicPartitions { get; } + + public MemberAssignment(ImmutableHashSet topicPartitions) + { + TopicPartitions = topicPartitions; + } + + private static ImmutableHashSet DeserializeTopicPartitions(BinaryReader reader) + { + var topic = reader.ReadString(); + var partitionLength = reader.ReadInt32BE(); + var partitions = new HashSet(); + for (var i = 0; i < partitionLength; ++i) + { + partitions.Add(new TopicPartition(topic, reader.ReadInt32BE())); + } + + return partitions.ToImmutableHashSet(); + } + + public static MemberAssignment Deserialize(byte[] raw) + { + if (raw == null || raw.Length == 0) + return null; + + var tps = new HashSet(); + using var stream = new MemoryStream(raw); + using var reader = new BinaryReader(stream); + + var version = reader.ReadInt16BE(); + var assignCount = reader.ReadInt32BE(); + for (var i = 0; i < assignCount; ++i) + { + tps.UnionWith(DeserializeTopicPartitions(reader)); + } + + return new MemberAssignment(tps.ToImmutableHashSet()); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Testkit/Resources/reference.conf b/src/Akka.Streams.Kafka.Testkit/Resources/reference.conf new file mode 100644 index 00000000..ce175a1a --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Resources/reference.conf @@ -0,0 +1,33 @@ +# // #testkit-settings +akka.kafka.testkit { + + # amount of time to wait until the desired cluster state is reached + cluster-timeout = 10 seconds + + # amount of time to wait until the desired consumer group state is reached + consumer-group-timeout = 10 seconds + + # amount of time to wait in-between state checks + check-interval = 100 ms +} +# // #testkit-settings + +# // #testkit-testcontainers-settings +akka.kafka.testkit.testcontainers { + + # define this to select a different Kafka version by choosing the desired version of Confluent Platform + # available Docker images: https://hub.docker.com/r/confluentinc/cp-kafka/tags + # Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html + confluent-platform-version = "6.0.0" + + # the number of Kafka brokers to include in a test cluster + num-brokers = 1 + + # set this to use a replication factor for internal Kafka topics such as Consumer Offsets and Transaction log. + # this replication factor must be less than or equal to `num-brokers` + internal-topics-replication-factor = 1 + + # set this to true to use launch a testcontainer for Confluent Schema Registry + use-schema-registry = false +} +# // #testkit-testcontainers-settings diff --git a/src/Akka.Streams.Kafka.Testkit/Utils.cs b/src/Akka.Streams.Kafka.Testkit/Utils.cs new file mode 100644 index 00000000..3c508b15 --- /dev/null +++ b/src/Akka.Streams.Kafka.Testkit/Utils.cs @@ -0,0 +1,25 @@ +using System; +using System.IO; +using System.Linq; + +namespace Akka.Streams.Kafka.Testkit +{ + public static class BinaryUtils + { + public static short ReadInt16BE(this BinaryReader reader) + { + var bytes = reader.ReadBytes(2); + if (BitConverter.IsLittleEndian) + bytes = bytes.Reverse().ToArray(); + return BitConverter.ToInt16(bytes, 0); + } + + public static short ReadInt32BE(this BinaryReader reader) + { + var bytes = reader.ReadBytes(4); + if (BitConverter.IsLittleEndian) + bytes = bytes.Reverse().ToArray(); + return BitConverter.ToInt16(bytes, 0); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/Akka.Streams.Kafka.Tests.csproj b/src/Akka.Streams.Kafka.Tests/Akka.Streams.Kafka.Tests.csproj index 7b8260e0..2bae301d 100644 --- a/src/Akka.Streams.Kafka.Tests/Akka.Streams.Kafka.Tests.csproj +++ b/src/Akka.Streams.Kafka.Tests/Akka.Streams.Kafka.Tests.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Akka.Streams.Kafka.Tests/BugFix240SupervisionStrategy.cs b/src/Akka.Streams.Kafka.Tests/BugFix240SupervisionStrategy.cs index c9d53fbf..f25b1328 100644 --- a/src/Akka.Streams.Kafka.Tests/BugFix240SupervisionStrategy.cs +++ b/src/Akka.Streams.Kafka.Tests/BugFix240SupervisionStrategy.cs @@ -12,6 +12,7 @@ using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; using Akka.Streams.Kafka.Supervision; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.Supervision; using Confluent.Kafka; using FluentAssertions; @@ -31,8 +32,8 @@ public BugFix240SupervisionStrategy(ITestOutputHelper output, KafkaFixture fixtu [Fact] public async Task SupervisionStrategy_Decider_on_Producer_Upstream_should_work() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var topicPartition = new TopicPartition(topic, 0); var callCount = 0; @@ -76,8 +77,8 @@ await numbers [Fact] public async Task SupervisionStrategy_Decider_on_Consumer_Downstream_should_work() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var topicPartition = new TopicPartition(topic, 0); var callCount = 0; @@ -126,8 +127,8 @@ await Source.From(Enumerable.Range(1, 11)) [Fact] public async Task SupervisionStrategy_Restart_Decider_on_Consumer_should_be_gapless() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var topicPartition = new TopicPartition(topic, 0); var serializationCallCount = 0; var callCount = 0; @@ -146,7 +147,7 @@ Directive Decider(Exception cause) var serializer = new Serializer(BitConverter.GetBytes); var producerSettings = ProducerSettings .Create(Sys, null, serializer) - .WithBootstrapServers(Fixture.KafkaServer); + .WithBootstrapServers(Fixture.BootstrapServer); await Source.From(Enumerable.Range(1, 10)) .Select(elem => new ProducerRecord(topicPartition, elem)) @@ -155,7 +156,7 @@ await Source.From(Enumerable.Range(1, 10)) // Exception is injected once using the FailOnceDeserializer var deserializer = new FailOnceDeserializer(5, data => BitConverter.ToInt32(data.Span)); var consumerSettings = ConsumerSettings.Create(Sys, null, deserializer) - .WithBootstrapServers(Fixture.KafkaServer) + .WithBootstrapServers(Fixture.BootstrapServer) .WithStopTimeout(TimeSpan.FromSeconds(1)) .WithProperty("auto.offset.reset", "earliest") .WithGroupId(group); @@ -181,12 +182,12 @@ await Source.From(Enumerable.Range(1, 10)) [Fact] public async Task Committable_consumer_with_failed_downstream_stage_result_should_be_gapless() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var topicPartition = new TopicPartition(topic, 0); var consumerSettings = ConsumerSettings.Create(Sys, null, null) - .WithBootstrapServers(Fixture.KafkaServer) + .WithBootstrapServers(Fixture.BootstrapServer) .WithStopTimeout(TimeSpan.FromSeconds(1)) .WithProperty("auto.offset.reset", "earliest") .WithGroupId(group); @@ -255,8 +256,8 @@ await Source.From(Enumerable.Range(1, 11)) [Fact] public async Task SupervisionStrategy_Decider_on_complex_stream_should_work() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var topicPartition = new TopicPartition(topic, 0); var committedTopicPartition = new TopicPartition($"{topic}-done", 0); var callCount = 0; @@ -353,12 +354,12 @@ Directive Decider(Exception cause) } } - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var producerSettings = ProducerSettings .Create(Sys, null, new FailingSerializer()) - .WithBootstrapServers(Fixture.KafkaServer); + .WithBootstrapServers(Fixture.BootstrapServer); // Exception is injected into the sink by the FailingSerializer serializer, it throws an exceptions // when the message "5" is encountered. @@ -393,13 +394,13 @@ Directive Decider(Exception cause) [Fact] public async Task Overridden_default_decider_on_PlainSink_should_work() { - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var decider = new OverridenProducerDecider(); var producerSettings = ProducerSettings .Create(Sys, null, new FailingSerializer()) - .WithBootstrapServers(Fixture.KafkaServer); + .WithBootstrapServers(Fixture.BootstrapServer); // Exception is injected into the sink by the FailingSerializer serializer, it throws an exceptions // when the message "5" is encountered. @@ -411,7 +412,7 @@ public async Task Overridden_default_decider_on_PlainSink_should_work() .WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider.Decide)), Materializer); - await GuardWithTimeoutAsync(sourceTask, TimeSpan.FromSeconds(5)); + await GuardWithTimeoutAsync(sourceTask, TimeSpan.FromMinutes(1)); var settings = CreateConsumerSettings(group1).WithValueDeserializer(new StringDeserializer()); var probe = KafkaConsumer @@ -446,8 +447,8 @@ Directive Decider(Exception cause) } int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var sourceTask = ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); @@ -473,8 +474,8 @@ Directive Decider(Exception cause) public async Task Overriden_default_decider_on_PlainSource_should_work() { int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var sourceTask = ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); @@ -499,8 +500,8 @@ public async Task Overriden_default_decider_on_PlainSource_should_work() public async Task Default_Decider_on_PlainSource_should_stop_on_internal_error() { int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var sourceTask = ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); @@ -530,8 +531,8 @@ public async Task Default_Decider_on_PlainSource_should_stop_on_internal_error() public async Task PlainSource_should_stop_on_errors() { int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); diff --git a/src/Akka.Streams.Kafka.Tests/Fixture.cs b/src/Akka.Streams.Kafka.Tests/Fixture.cs new file mode 100644 index 00000000..07593609 --- /dev/null +++ b/src/Akka.Streams.Kafka.Tests/Fixture.cs @@ -0,0 +1,11 @@ +using Akka.Streams.Kafka.Testkit.Fixture; +using Xunit; + +namespace Akka.Streams.Kafka.Tests +{ + [CollectionDefinition(Name)] + public sealed class KafkaSpecsFixture : ICollectionFixture + { + public const string Name = "KafkaSpecs"; + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs index 273bee58..f1b524ab 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/AtMostOnceSourceIntegrationTests.cs @@ -4,6 +4,7 @@ using Akka.Streams.Dsl; using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Confluent.Kafka; using FluentAssertions; @@ -22,13 +23,13 @@ public AtMostOnceSourceIntegrationTests(ITestOutputHelper output, KafkaFixture f [Fact] public async Task AtMostOnceSource_Should_stop_consuming_actor_when_used_with_Take() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); await ProduceStrings(new TopicPartition(topic, 0), Enumerable.Range(1, 10), ProducerSettings); var (control, result) = KafkaConsumer.AtMostOnceSource(CreateConsumerSettings(group), Subscriptions.Assignment(new TopicPartition(topic, 0))) - .Select(m => m.Value) + .Select(m => m.Message.Value) .Take(5) .ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); @@ -41,8 +42,8 @@ public async Task AtMostOnceSource_Should_stop_consuming_actor_when_used_with_Ta [Fact(Skip = "Issue https://github.com/akkadotnet/Akka.Streams.Kafka/issues/66")] public async Task AtMostOnceSource_Should_work() { - var topic = CreateTopic(1); - var settings = CreateConsumerSettings(CreateGroup(1)); + var topic = CreateTopicName(1); + var settings = CreateConsumerSettings(CreateGroupId(1)); var totalMessages = 10; var lastMessage = new TaskCompletionSource(); @@ -51,7 +52,7 @@ public async Task AtMostOnceSource_Should_work() var (task, probe) = KafkaConsumer.AtMostOnceSource(settings, Subscriptions.Topics(topic)) .SelectAsync(1, m => { - if (m.Value == totalMessages.ToString()) + if (m.Message.Value == totalMessages.ToString()) lastMessage.SetResult(Done.Instance); return Task.FromResult(Done.Instance); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommitWithMetadataSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommitWithMetadataSourceIntegrationTests.cs index fb8d3b45..0b2b637c 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommitWithMetadataSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommitWithMetadataSourceIntegrationTests.cs @@ -5,6 +5,7 @@ using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Akka.Util.Internal; using Confluent.Kafka; @@ -30,8 +31,8 @@ public CommitWithMetadataSourceIntegrationTests( ITestOutputHelper output, Kafka [Fact] public async Task CommitWithMetadataSource_Commit_metadata_in_message_Should_work() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); string MetadataFromMessage(ConsumeResult message) => message.Offset.ToString(); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs index c025f048..c8499b96 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs @@ -6,6 +6,7 @@ using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Util; using Confluent.Kafka; using FluentAssertions; @@ -25,8 +26,8 @@ public CommittablePartitionedSourceIntegrationTests(ITestOutputHelper output, Ka public async Task CommittablePartitionedSource_Should_handle_exceptions_in_stream_without_commit_failures() { var partitionsCount = 3; - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; var exceptionTriggered = new AtomicBoolean(false); var allTopicPartitions = Enumerable.Range(0, partitionsCount).Select(i => new TopicPartition(topic, i)).ToList(); @@ -43,7 +44,7 @@ public async Task CommittablePartitionedSource_Should_handle_exceptions_in_strea var (topicPartition, source) = tuple; createdSubSources.TryAdd(topicPartition); return source - .Log($"Subsource for partition #{topicPartition.Partition.Value}", m => m.Record.Value) + .Log($"Subsource for partition #{topicPartition.Partition.Value}", m => m.Record.Message.Value) .SelectAsync(3, async message => { // fail on first partition; otherwise delay slightly and emit diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs index 139bbdcd..f2d3888e 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs @@ -8,6 +8,7 @@ using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Confluent.Kafka; using Xunit; @@ -27,8 +28,8 @@ public CommittableSourceIntegrationTests(ITestOutputHelper output, KafkaFixture public async Task CommitableSource_consumes_messages_from_Producer_without_commits() { int elementsCount = 100; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); @@ -42,7 +43,7 @@ await Source var probe = KafkaConsumer .CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1)) - .Select(c => c.Record.Value) + .Select(c => c.Record.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe.Request(elementsCount); @@ -55,10 +56,10 @@ await Source [Fact] public async Task CommitableSource_resume_from_commited_offset() { - var topic1 = CreateTopic(1); + var topic1 = CreateTopicName(1); var topicPartition1 = new TopicPartition(topic1, 0); - var group1 = CreateGroup(1); - var group2 = CreateGroup(2); + var group1 = CreateGroupId(1); + var group2 = CreateGroupId(2); await GivenInitializedTopic(topicPartition1); @@ -74,7 +75,7 @@ await Source .SelectAsync(10, async elem => { await elem.CommitableOffset.Commit(); - committedElements.Enqueue(elem.Record.Value); + committedElements.Enqueue(elem.Record.Message.Value); return Done.Instance; }) .ToMaterialized(this.SinkProbe(), Keep.Both) @@ -92,7 +93,7 @@ await Source AwaitCondition(() => task.IsShutdown.IsCompletedSuccessfully); var probe2 = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0))) - .Select(_ => _.Record.Value) + .Select(_ => _.Record.Message.Value) .RunWith(this.SinkProbe(), Materializer); // Note that due to buffers and SelectAsync(10) the committed offset is more @@ -112,7 +113,7 @@ await Source // another consumer should see all var probe3 = KafkaConsumer.CommittableSource(consumerSettings.WithGroupId(group2), Subscriptions.Assignment(new TopicPartition(topic1, 0))) - .Select(_ => _.Record.Value) + .Select(_ => _.Record.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe3.Request(100); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommitterFlowIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommitterFlowIntegrationTests.cs index 8bf06814..13e5b701 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommitterFlowIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommitterFlowIntegrationTests.cs @@ -10,6 +10,7 @@ using Akka.Streams.Kafka.Settings; using Akka.Streams.TestKit; using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Kafka.Testkit.Fixture; using Confluent.Kafka; using Xunit; using Xunit.Abstractions; @@ -29,9 +30,9 @@ public CommitterFlowIntegrationTests(ITestOutputHelper output, KafkaFixture fixt [InlineData(5)] public async Task CommitterFlow_commits_offsets_from_CommittableSource(int batchSize) { - var topic1 = CreateTopic(1); + var topic1 = CreateTopicName(1); var topicPartition1 = new TopicPartition(topic1, 0); - var group1 = CreateGroup(1); + var group1 = CreateGroupId(1); await GivenInitializedTopic(topicPartition1); @@ -47,7 +48,7 @@ await Source var (task, probe1) = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1)) .SelectAsync(10, elem => { - committedElements.Enqueue(elem.Record.Value); + committedElements.Enqueue(elem.Record.Message.Value); return Task.FromResult(elem.CommitableOffset as ICommittable); }) .Via(Committer.Flow(committerSettings)) @@ -66,7 +67,7 @@ await Source AwaitCondition(() => task.IsShutdown.IsCompletedSuccessfully); var probe2 = KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Assignment(new TopicPartition(topic1, 0))) - .Select(_ => _.Value) + .Select(_ => _.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe2.Request(75); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs index 0a42f1fa..6b0152d0 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/ExternalPlainSourceIntegrationTests.cs @@ -9,6 +9,7 @@ using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; using Akka.Streams.Kafka.Stages.Consumers.Actors; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Akka.TestKit; using Akka.Util.Internal; @@ -30,8 +31,8 @@ public ExternalPlainSourceIntegrationTests(ITestOutputHelper output, KafkaFixtur public async Task ExternalPlainSource_with_external_consumer_Should_work() { var elementsCount = 10; - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); //Consumer is represented by actor var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(CreateConsumerSettings(group))); @@ -64,8 +65,8 @@ public async Task ExternalPlainSource_with_external_consumer_Should_work() [Fact] public async Task ExternalPlainSource_should_be_stopped_on_serialization_error_only_when_requested_messages() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); // Make consumer expect numeric messages var settings = CreateConsumerSettings(group).WithValueDeserializer(Deserializers.Int32); @@ -106,8 +107,8 @@ public async Task ExternalPlainSource_should_be_stopped_on_serialization_error_o [Fact] public async Task ExternalPlainSource_verify_consuming_actor_pause_resume_partitions_works_fine() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); // Create consumer actor var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(CreateConsumerSettings(group))); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs index ea624950..a2c0001c 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs @@ -9,6 +9,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Confluent.Kafka; using FluentAssertions; using Xunit; @@ -29,11 +30,11 @@ public async Task ProducerFlowWithContext_should_work_with_source_with_context() bool Duplicate(string value) => value == "1"; bool Ignore(string value) => value == "2"; - var consumerSettings = CreateConsumerSettings(CreateGroup(1)); - var topic1 = CreateTopic(1); - var topic2 = CreateTopic(2); - var topic3 = CreateTopic(3); - var topic4 = CreateTopic(4); + var consumerSettings = CreateConsumerSettings(CreateGroupId(1)); + var topic1 = CreateTopicName(1); + var topic2 = CreateTopicName(2); + var topic3 = CreateTopicName(3); + var topic4 = CreateTopicName(4); var producerSettings = BuildProducerSettings(); var committerSettings = CommitterSettings; var totalMessages = 10; diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedManualOffsetSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedManualOffsetSourceIntegrationTests.cs index 9e0c149a..10af2482 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedManualOffsetSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedManualOffsetSourceIntegrationTests.cs @@ -6,6 +6,7 @@ using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Akka.Streams.Util; using Akka.Util; @@ -26,8 +27,8 @@ public PlainPartitionedManualOffsetSourceIntegrationTests(ITestOutputHelper outp [Fact] public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_from_beginning_of_the_topic() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; var consumerSettings = CreateConsumerSettings(group); @@ -41,7 +42,7 @@ public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_from getOffsetsOnAssign: _ => Task.FromResult(ImmutableHashSet.Empty as IImmutableSet), onRevoke: _ => { } ).MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl())) - .Select(m => m.Value) + .Select(m => m.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe.Request(totalMessages); @@ -52,8 +53,8 @@ public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_from [Fact] public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_with_offset() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var consumerSettings = CreateConsumerSettings(group); await ProduceStrings(topic, Enumerable.Range(0, 100), ProducerSettings); @@ -70,7 +71,7 @@ public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_with }, onRevoke: _ => { } ).MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl())) - .Select(m => m.Value) + .Select(m => m.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe.Request(99); @@ -83,8 +84,8 @@ public async Task PlainPartitionedManualOffsetSource_Should_begin_consuming_with [Fact] public async Task PlainPartitionedManualOffsetSource_Should_call_the_OnRevoke_hook() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var consumerSettings = CreateConsumerSettings(group); var partitionsAssigned = false; @@ -104,7 +105,7 @@ public async Task PlainPartitionedManualOffsetSource_Should_call_the_OnRevoke_ho revoked = new Option>(revokedPartitions); }) .MergeMany(3, tuple => tuple.Item2.MapMaterializedValue(notUsed => new NoopControl())) - .Select(m => m.Value); + .Select(m => m.Message.Value); var (control1, firstConsumer) = source.ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs index 51f7ab9e..dd26a7c2 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs @@ -12,6 +12,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.Supervision; using Akka.Streams.TestKit; using Akka.Util.Internal; @@ -32,8 +33,8 @@ public PlainPartitionedSourceIntegrationTests(ITestOutputHelper output, KafkaFix [Fact] public async Task PlainPartitionedSource_should_work() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; var receivedMessages = new AtomicCounter(0); @@ -79,8 +80,8 @@ public async Task PlainPartitionedSource_should_work() [Fact] public async Task PlainPartitionedSource_Should_split_messages_by_partitions() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; var consumerSettings = CreateConsumerSettings(group); @@ -115,8 +116,8 @@ public async Task PlainPartitionedSource_Should_split_messages_by_partitions() [Fact] public async Task PlainPartitionedSource_should_stop_partition_sources_when_stopped() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; await ProduceStrings(topic, Enumerable.Range(1, totalMessages), ProducerSettings); @@ -127,7 +128,7 @@ public async Task PlainPartitionedSource_should_stop_partition_sources_when_stop .Select(message => { Log.Debug($"Consumed partition {message.Partition.Value}"); - return message.Value; + return message.Message.Value; }) .ToMaterialized(this.SinkProbe(), Keep.Both) .Run(Materializer); @@ -144,9 +145,9 @@ public async Task PlainPartitionedSource_should_stop_partition_sources_when_stop } [Fact] - public async Task PlainPartitionedSource_should_be_signalled_the_stream_by_partitioned_sources() + public void PlainPartitionedSource_should_be_signalled_the_stream_by_partitioned_sources() { - var settings = CreateConsumerSettings(CreateGroup(1)) + var settings = CreateConsumerSettings(CreateGroupId(1)) .WithBootstrapServers("localhost:1111"); // Bad address var result = KafkaConsumer.PlainPartitionedSource(settings, Subscriptions.Topics("topic")) @@ -158,8 +159,8 @@ public async Task PlainPartitionedSource_should_be_signalled_the_stream_by_parti [Fact] public async Task PlainPartitionedSource_should_be_signalled_about_serialization_errors() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); await ProduceStrings(topic, new int[] { 0 }, ProducerSettings); // Produce "0" string @@ -193,8 +194,8 @@ public async Task PlainPartitionedSource_should_be_signalled_about_serialization [Fact] public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is_cancelled() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 100; await ProduceStrings(topic, Enumerable.Range(1, totalMessages), ProducerSettings); @@ -206,10 +207,10 @@ public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is var (topicPartition, source) = tuple; return source .MapMaterializedValue(notUsed => new NoopControl()) - .Log(topicPartition.ToString(), m => $"Consumed offset {m.Offset} (value: {m.Value})") + .Log(topicPartition.ToString(), m => $"Consumed offset {m.Offset} (value: {m.Message.Value})") .Take(10); }) - .Select(m => int.Parse(m.Value)) + .Select(m => int.Parse(m.Message.Value)) .Log("Merged stream", m => m) .Scan(0, (c, _) => c + 1) .TakeWhile(m => m < totalMessages, inclusive: true) @@ -223,8 +224,8 @@ public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is [Fact] public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_failed() { - var topic = CreateTopic(1); - var group = CreateGroup(1); + var topic = CreateTopicName(1); + var group = CreateGroupId(1); var totalMessages = 105; var producerSettings = BuildProducerSettings(); @@ -237,7 +238,7 @@ await Source var queue = new ConcurrentQueue(); var consumerSettings = ConsumerSettings.Create(Sys, null, null) - .WithBootstrapServers(Fixture.KafkaServer) + .WithBootstrapServers(Fixture.BootstrapServer) .WithStopTimeout(TimeSpan.FromSeconds(1)) .WithProperty("auto.offset.reset", "earliest") .WithGroupId(group); @@ -286,7 +287,7 @@ public void NativeKafkaIConsumerSeekShouldNotFail() var producerConfig = new ProducerConfig { - BootstrapServers = Fixture.KafkaServer + BootstrapServers = Fixture.BootstrapServer }; var producer = new ProducerBuilder(producerConfig).Build(); @@ -304,7 +305,7 @@ public void NativeKafkaIConsumerSeekShouldNotFail() var consumerConfig = new ConsumerConfig { - BootstrapServers = Fixture.KafkaServer, + BootstrapServers = Fixture.BootstrapServer, GroupId = group, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs index d6986a44..dbe3bbf1 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs @@ -8,6 +8,7 @@ using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using Confluent.Kafka; using FluentAssertions; @@ -27,12 +28,10 @@ public PlainSinkIntegrationTests(ITestOutputHelper output, KafkaFixture fixture) [Fact] public async Task PlainSink_should_publish_100_elements_to_Kafka_producer() { - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); - await GivenInitializedTopic(topicPartition1); - var consumerSettings = CreateConsumerSettings(group1); var consumer = consumerSettings.ConsumerFactory != null ? consumerSettings.ConsumerFactory(consumerSettings) @@ -73,7 +72,7 @@ bool CheckTimeout(TimeSpan timeout) [Fact] public async Task PlainSink_should_fail_stage_if_broker_unavailable() { - var topic1 = CreateTopic(1); + var topic1 = CreateTopicName(1); await GivenInitializedTopic(topic1); @@ -111,12 +110,12 @@ Directive Decider(Exception cause) } var elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var producerSettings = ProducerSettings .Create(Sys, null, new FailingSerializer()) - .WithBootstrapServers(Fixture.KafkaServer); + .WithBootstrapServers(Fixture.BootstrapServer); var sink = KafkaProducer.PlainSink(producerSettings) .AddAttributes(ActorAttributes.CreateSupervisionStrategy(Decider)); @@ -131,10 +130,10 @@ Directive Decider(Exception cause) if (completeTask == timeoutTask) throw new Exception("Producer timed out"); - var settings = CreateConsumerSettings(group1).WithValueDeserializer(new StringDeserializer()); + var settings = CreateConsumerSettings(group1).WithValueDeserializer(new ProperStringDeserializer()); var probe = KafkaConsumer .PlainSource(settings, Subscriptions.Assignment(new TopicPartition(topic1, 0))) - .Select(c => c.Value) + .Select(c => c.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe.Request(elementsCount); @@ -158,7 +157,7 @@ public byte[] Serialize(string data, SerializationContext context) } } - private class StringDeserializer: IDeserializer + private class ProperStringDeserializer: IDeserializer { public string Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) { diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs index ff0d43fd..22ee327d 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs @@ -11,6 +11,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.Kafka.Tests.Logging; using Akka.Streams.Supervision; using Akka.Streams.TestKit; @@ -35,7 +36,7 @@ public PlainSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixtur { return KafkaConsumer .PlainSource(consumerSettings, sub) - .Select(c => c.Value) + .Select(c => c.Message.Value) .ToMaterialized(this.SinkProbe(), Keep.Both) .Run(Materializer); } @@ -44,8 +45,8 @@ public PlainSourceIntegrationTests(ITestOutputHelper output, KafkaFixture fixtur public async Task PlainSource_consumes_messages_from_KafkaProducer_with_topicPartition_assignment() { int elementsCount = 100; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); @@ -68,8 +69,8 @@ public async Task PlainSource_consumes_messages_from_KafkaProducer_with_topicPar { int elementsCount = 100; int offset = 50; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); @@ -91,8 +92,8 @@ public async Task PlainSource_consumes_messages_from_KafkaProducer_with_topicPar public async Task PlainSource_consumes_messages_from_KafkaProducer_with_subscribe_to_topic() { int elementsCount = 100; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); @@ -114,8 +115,8 @@ public async Task PlainSource_consumes_messages_from_KafkaProducer_with_subscrib [Fact] public async Task PlainSource_should_fail_stage_if_broker_unavailable() { - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); @@ -133,8 +134,8 @@ public async Task PlainSource_should_fail_stage_if_broker_unavailable() public async Task PlainSource_should_stop_on_deserialization_errors() { int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); @@ -143,7 +144,7 @@ public async Task PlainSource_should_stop_on_deserialization_errors() var probe = KafkaConsumer .PlainSource(settings, Subscriptions.Assignment(new TopicPartition(topic1, 0))) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)) - .Select(c => c.Value) + .Select(c => c.Message.Value) .RunWith(this.SinkProbe(), Materializer); var error = probe.Request(elementsCount).ExpectEvent(TimeSpan.FromSeconds(10)); @@ -167,8 +168,8 @@ Directive Decider(Exception cause) } int elementsCount = 10; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); await ProduceStrings(new TopicPartition(topic1, 0), Enumerable.Range(1, elementsCount), ProducerSettings); @@ -177,7 +178,7 @@ Directive Decider(Exception cause) var probe = KafkaConsumer .PlainSource(settings, Subscriptions.Assignment(new TopicPartition(topic1, 0))) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Decider)) - .Select(c => c.Value) + .Select(c => c.Message.Value) .RunWith(this.SinkProbe(), Materializer); probe.Request(elementsCount); @@ -190,8 +191,8 @@ Directive Decider(Exception cause) public async Task Custom_partition_event_handling_Should_work() { int elementsCount = 100; - var topic1 = CreateTopic(1); - var group1 = CreateGroup(1); + var topic1 = CreateTopicName(1); + var group1 = CreateGroupId(1); var topicPartition1 = new TopicPartition(topic1, 0); await GivenInitializedTopic(topicPartition1); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs index 70410420..96020bc8 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/SourceWithOffsetContextIntegrationTests.cs @@ -6,6 +6,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Akka.Streams.TestKit; using FluentAssertions; using Xunit; @@ -23,8 +24,8 @@ public SourceWithOffsetContextIntegrationTests(ITestOutputHelper output, KafkaFi [Fact] public async Task SourceWithOffsetContext_at_least_once_consuming_should_work() { - var topic = CreateTopic(1); - var settings = CreateConsumerSettings(CreateGroup(1)); + var topic = CreateTopicName(1); + var settings = CreateConsumerSettings(CreateGroupId(1)); var elementCount = 10; var batchSize = 2; var messages = Enumerable.Range(1, elementCount).ToList(); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs index 1f45a247..94cc7141 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs @@ -8,6 +8,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit.Fixture; using Confluent.Kafka; using FluentAssertions; using Xunit; @@ -25,9 +26,9 @@ public TransactionalIntegrationTests(ITestOutputHelper output, KafkaFixture fixt [Fact(Skip = "Missing producer transactions support, see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85")] public async Task Transactional_source_with_sink_Should_work() { - var settings = CreateConsumerSettings(CreateGroup(1)); - var sourceTopic = CreateTopic(1); - var targetTopic = CreateTopic(2); + var settings = CreateConsumerSettings(CreateGroupId(1)); + var sourceTopic = CreateTopicName(1); + var targetTopic = CreateTopicName(2); var transactionalId = Guid.NewGuid().ToString(); const int totalMessages = 10; @@ -36,7 +37,7 @@ public async Task Transactional_source_with_sink_Should_work() .Select(message => { return ProducerMessage.Single( - new ProducerRecord(targetTopic, message.Record.Key, message.Record.Value), + new ProducerRecord(targetTopic, message.Record.Message.Key, message.Record.Message.Value), passThrough: message.PartitionOffset); }) .ToMaterialized(KafkaProducer.TransactionalSink(ProducerSettings, transactionalId), Keep.Both) @@ -57,7 +58,7 @@ public async Task Transactional_source_with_sink_Should_work() private DrainingControl>> ConsumeStrings(string topic, int count) { - return KafkaConsumer.PlainSource(CreateConsumerSettings(CreateGroup(1)), Subscriptions.Topics(topic)) + return KafkaConsumer.PlainSource(CreateConsumerSettings(CreateGroupId(1)), Subscriptions.Topics(topic)) .Take(count) .ToMaterialized(Sink.Seq>(), Keep.Both) .MapMaterializedValue(DrainingControl>>.Create) diff --git a/src/Akka.Streams.Kafka.Tests/Internal/ConsumerSpec.cs b/src/Akka.Streams.Kafka.Tests/Internal/ConsumerSpec.cs index ea094912..b5bea925 100644 --- a/src/Akka.Streams.Kafka.Tests/Internal/ConsumerSpec.cs +++ b/src/Akka.Streams.Kafka.Tests/Internal/ConsumerSpec.cs @@ -8,11 +8,12 @@ using Akka.Configuration; using Akka.Streams.Dsl; using Akka.Streams.Implementation; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Kafka.Dsl; using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; -using Akka.Streams.Kafka.Tests.TestKit.Internal; +using Akka.Streams.Kafka.Testkit; using Akka.Streams.TestKit; using Akka.Util.Internal; using Confluent.Kafka; @@ -200,8 +201,9 @@ await CheckMessagesReceiving(Messages.Grouped(97) .Select(x => new List>()).ToList()); } - [Fact(DisplayName = - "CommittableSource should complete out and keep underlying client open when control.stop called")] + [Fact( + DisplayName = "CommittableSource should complete out and keep underlying client open when control.stop called", + Skip = "Not implemented yet")] public async Task ShouldKeepClientOpenOnStop() { this.AssertAllStagesStopped(() => @@ -210,83 +212,4 @@ public async Task ShouldKeepClientOpenOnStop() }, Sys.Materializer()); } } - - internal static class Extensions - { - public static async Task WithTimeout(this Task task, TimeSpan timeout) - { - using (var cts = new CancellationTokenSource()) - { - var timeoutTask = Task.Delay(timeout, cts.Token); - var completed = await Task.WhenAny(task, timeoutTask); - if (completed == timeoutTask) - throw new OperationCanceledException("Operation timed out"); - else - cts.Cancel(); - } - } - - public static List> Grouped(this IEnumerable messages, int size) - { - var groups = new List>(); - var list = new List(); - var index = 0; - foreach (var message in messages) - { - list.Add(message); - if(index != 0 && index % size == 0) - { - groups.Add(list); - list = new List(); - } - - index++; - } - if(list.Count > 0) - groups.Add(list); - return groups; - } - - public static void AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Action block, IMaterializer materializer) - { - AssertAllStagesStopped(spec, () => - { - block(); - return NotUsed.Instance; - }, materializer); - } - - public static T AssertAllStagesStopped(this Akka.TestKit.Xunit2.TestKit spec, Func block, IMaterializer materializer) - { - if (!(materializer is ActorMaterializerImpl impl)) - return block(); - - var probe = spec.CreateTestProbe(impl.System); - probe.Send(impl.Supervisor, StreamSupervisor.StopChildren.Instance); - probe.ExpectMsg(); - var result = block(); - - probe.Within(TimeSpan.FromSeconds(5), () => - { - IImmutableSet children = ImmutableHashSet.Empty; - try - { - probe.AwaitAssert(() => - { - impl.Supervisor.Tell(StreamSupervisor.GetChildren.Instance, probe.Ref); - children = probe.ExpectMsg().Refs; - if (children.Count != 0) - throw new Exception($"expected no StreamSupervisor children, but got {children.Aggregate("", (s, @ref) => s + @ref + ", ")}"); - }); - } - catch - { - children.ForEach(c=>c.Tell(StreamSupervisor.PrintDebugDump.Instance)); - throw; - } - }); - - return result; - } - } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs b/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs deleted file mode 100644 index bd124e67..00000000 --- a/src/Akka.Streams.Kafka.Tests/KafkaFixture.cs +++ /dev/null @@ -1,246 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Data.Common; -using System.Linq; -using System.Runtime.InteropServices; -using System.Threading.Tasks; -using Akka.Util; -using Docker.DotNet; -using Docker.DotNet.Models; -using Xunit; - -namespace Akka.Streams.Kafka.Tests -{ - [CollectionDefinition(Name)] - public sealed class KafkaSpecsFixture : ICollectionFixture - { - public const string Name = "KafkaSpecs"; - } - - public class KafkaFixture : IAsyncLifetime - { - private const string KafkaImageName = "confluentinc/cp-kafka"; - private const string KafkaImageTag = "5.3.0"; - private const string ZookeeperImageName = "confluentinc/cp-zookeeper"; - private const string ZookeeperImageTag = "5.3.0"; - - private const string KafkaContainerNameBase = "akka.net-kafka-test"; - private const string ZookeeperContainerNameBase = "akka.net-zookeeper-test"; - private const string NetworkNameBase = "akka.net-network-test"; - private readonly string _kafkaContainerName = $"{KafkaContainerNameBase}-{Guid.NewGuid():N}"; - private readonly string _zookeeperContainerName = $"{ZookeeperContainerNameBase}-{Guid.NewGuid():N}"; - private readonly string _networkName = $"{NetworkNameBase}-{Guid.NewGuid():N}"; - - private readonly DockerClient _client; - - public KafkaFixture() - { - DockerClientConfiguration config; - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) - config = new DockerClientConfiguration(new Uri("unix://var/run/docker.sock")); - else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - config = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")); - else - throw new NotSupportedException($"Unsupported OS [{RuntimeInformation.OSDescription}]"); - - _client = config.CreateClient(); - - if (TestsConfiguration.UseExistingDockerContainer) - { - KafkaPort = 29092; - } - } - - public int KafkaPort { get; private set; } - public string KafkaServer => $"127.0.0.1:{KafkaPort}"; - - public const int KafkaReplicationFactor = 1; - public const int KafkaPartitions = 3; - - public async Task InitializeAsync() - { - if (TestsConfiguration.UseExistingDockerContainer) - { - // When using existing container, no actions should be performed on startup - return; - } - - // Load images, if they not exist yet - await EnsureImageExists(ZookeeperImageName, ZookeeperImageTag); - await EnsureImageExists(KafkaImageName, KafkaImageTag); - - // Generate random ports for zookeeper and kafka - var zookeeperPort = ThreadLocalRandom.Current.Next(32000, 33000); - KafkaPort = ThreadLocalRandom.Current.Next(28000, 29000); - - // Make resources cleanup before allocating new containers/networks - await ResourceCleanup(); - - // create the containers - await CreateContainer(ZookeeperImageName, ZookeeperImageTag, _zookeeperContainerName, zookeeperPort, new Dictionary() - { - ["ZOOKEEPER_CLIENT_PORT"] = zookeeperPort.ToString(), - ["ZOOKEEPER_TICK_TIME"] = "2000", - }); - await CreateContainer(KafkaImageName, KafkaImageTag, _kafkaContainerName, KafkaPort, new Dictionary() - { - ["KAFKA_BROKER_ID"] = "1", - ["KAFKA_NUM_PARTITIONS"] = KafkaPartitions.ToString(), - ["KAFKA_ZOOKEEPER_CONNECT"] = $"{_zookeeperContainerName}:{zookeeperPort}", // referencing zookeeper container directly in common docker network - ["KAFKA_LISTENERS"] = $"PLAINTEXT://:{KafkaPort}", - ["KAFKA_ADVERTISED_LISTENERS"] = $"PLAINTEXT://127.0.0.1:{KafkaPort}", - ["KAFKA_AUTO_CREATE_TOPICS_ENABLE"] = "true", - ["KAFKA_DELETE_TOPIC_ENABLE"] = "true", - ["KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR"] = KafkaReplicationFactor.ToString(), - ["KAFKA_OPTS"] = "-Djava.net.preferIPv4Stack=True" - }); - - // Setting up network for containers to communicate - var network = await _client.Networks.CreateNetworkAsync(new NetworksCreateParameters(new NetworkCreate()) - { - Name = _networkName - }); - await _client.Networks.ConnectNetworkAsync(network.ID, new NetworkConnectParameters() - { - Container = _kafkaContainerName - }); - await _client.Networks.ConnectNetworkAsync(network.ID, new NetworkConnectParameters() - { - Container = _zookeeperContainerName - }); - - // start the containers - await _client.Containers.StartContainerAsync(_zookeeperContainerName, new ContainerStartParameters()); - await _client.Containers.StartContainerAsync(_kafkaContainerName, new ContainerStartParameters()); - - // Provide a 10 second startup delay - await Task.Delay(TimeSpan.FromSeconds(10)); - } - - public async Task DisposeAsync() - { - if (_client != null) - { - // Shutdown running containers only when we were not using pre-existing container - if (!TestsConfiguration.UseExistingDockerContainer) - { - await ResourceCleanup(); - } - - _client.Dispose(); - } - } - - /// - /// This performs cleanup of allocated containers/networks during tests - /// - /// - private async Task ResourceCleanup() - { - if (_client == null) - return; - - // This task loads list of containers and stops/removes those which were started during tests - var containerCleanupTask = _client.Containers.ListContainersAsync(new ContainersListParameters()).ContinueWith( - async t => - { - if (t.IsFaulted) - return; - - var containersToStop = t.Result.Where(c => c.Names.Any(name => name.Contains(KafkaContainerNameBase)) || - c.Names.Any(name => name.Contains(ZookeeperContainerNameBase))); - - var stopTasks = containersToStop.Select(async container => - { - await _client.Containers.StopContainerAsync(container.ID, new ContainerStopParameters()); - await _client.Containers.RemoveContainerAsync(container.ID, new ContainerRemoveParameters { Force = true }); - }); - await Task.WhenAll(stopTasks); - }).Unwrap(); - - // This tasks loads docker networks and removes those which were started during tests - var networkCleanupTask = _client.Networks.ListNetworksAsync(new NetworksListParameters()).ContinueWith( - async t => - { - if (t.IsFaulted) - return; - - var networksToDelete = t.Result.Where(network => network.Name.Contains(NetworkNameBase)); - - var deleteTasks = networksToDelete.Select(network => _client.Networks.DeleteNetworkAsync(network.ID)); - - await Task.WhenAll(deleteTasks); - }); - - try - { - // Wait until cleanup is finished - await Task.WhenAll(containerCleanupTask, networkCleanupTask); - } - catch { /* If the cleanup failes, this is not the reason to fail tests */ } - } - - private async Task CreateContainer(string imageName, string imageTag, string containerName, int portToExpose, Dictionary env) - { - await _client.Containers.CreateContainerAsync(new CreateContainerParameters - { - Image = $"{imageName}:{imageTag}", - Name = containerName, - Tty = true, - - ExposedPorts = new Dictionary - { - {$"{portToExpose}/tcp", new EmptyStruct()} - }, - HostConfig = new HostConfig - { - PortBindings = new Dictionary> - { - { - $"{portToExpose}/tcp", - new List - { - new PortBinding - { - HostPort = $"{portToExpose}" - } - } - } - }, - ExtraHosts = new [] { "localhost:127.0.0.1" }, - }, - Env = env.Select(pair => $"{pair.Key}={pair.Value}").ToArray(), - }); - } - - private async Task EnsureImageExists(string imageName, string imageTag) - { - var existingImages = await _client.Images.ListImagesAsync( - new ImagesListParameters - { - Filters = new Dictionary> - { - { - "reference", - new Dictionary - { - {$"{imageName}:{imageTag}", true} - } - } - } - }); - - if (existingImages.Count == 0) - { - await _client.Images.CreateImageAsync( - new ImagesCreateParameters { FromImage = imageName, Tag = imageTag }, null, - new Progress(message => - { - Console.WriteLine(!string.IsNullOrEmpty(message.ErrorMessage) - ? message.ErrorMessage - : $"{message.ID} {message.Status} {message.ProgressMessage}"); - })); - } - } - } -} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs index 76267b89..bb056b8f 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs @@ -8,7 +8,11 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.Kafka.Testkit; +using Akka.Streams.Kafka.Testkit.Fixture; +using Akka.Streams.Kafka.Testkit.Internal; using Akka.Streams.TestKit; +using Akka.Util; using Confluent.Kafka; using Confluent.Kafka.Admin; using FluentAssertions; @@ -19,30 +23,92 @@ namespace Akka.Streams.Kafka.Tests { [Collection(KafkaSpecsFixture.Name)] - public abstract class KafkaIntegrationTests : Akka.TestKit.Xunit2.TestKit + public abstract class KafkaIntegrationTests : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime { - public KafkaFixture Fixture { get; } + protected const long MessageLogInterval = 500L; + + protected const int Partition0 = 0; + + protected Flow LogSentMessages() + => Flow.Create().Select(i => + { + if(i % MessageLogInterval == 0) + Log.Info("Sent [{0}] messages so far", i); + return i; + }); + + protected Flow LogReceivedMessages() + => Flow.Create().Select(i => + { + if(i % MessageLogInterval == 0) + Log.Info("Received [{0}] messages so far", i); + return i; + }); + + protected Flow LogReceivedMessages(TopicPartition tp) + => Flow.Create().Select(i => + { + if(i % MessageLogInterval == 0) + Log.Info("{0}: Received [{1}] messages so far", tp, i); + return i; + }); + + protected async Task StopRandomBrokerAsync(int msgCount) + { + var broker = Fixture.Brokers[ThreadLocalRandom.Current.Next(Fixture.Brokers.Count)]; + Log.Warning( + "Stopping one Kafka container with network aliases [{0}], container id [{1}], after [{2}] messages", + broker.ContainerName, + broker.ContainerId, + msgCount); + await broker.StopAsync(); + } + + private IAdminClient _adminClient; + public KafkaFixtureBase Fixture { get; protected set; } + protected IMaterializer Materializer { get; } + protected KafkaTestkitSettings Settings { get; } - public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, KafkaFixture fixture) + public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, KafkaFixtureBase fixture = null) : base(Default(), actorSystemName, output) { Fixture = fixture; Materializer = Sys.Materializer(); - - Sys.Log.Info("Starting test: " + output.GetCurrentTestName()); + Sys.Settings.InjectTopLevelFallback(KafkaTestKit.DefaultConfig); + Settings = new KafkaTestkitSettings(Sys); + + Log.Info("Starting test: " + output.GetCurrentTestName()); } private string Uuid { get; } = Guid.NewGuid().ToString(); - protected string CreateTopic(int number) => $"topic-{number}-{Uuid}"; - protected string CreateGroup(int number) => $"group-{number}-{Uuid}"; + protected string CreateTopicName(int number) => $"topic-{number}-{Uuid}"; + protected string CreateGroupId(int number) => $"group-{number}-{Uuid}"; + public async Task InitializeAsync() + { + if (!(Fixture is KafkaFixture)) + await Fixture.InitializeAsync(); + + _adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = Fixture.BootstrapServer + }).Build(); + } + + public async Task DisposeAsync() + { + if (!(Fixture is KafkaFixture)) + await Fixture.DisposeAsync(); + _adminClient?.Dispose(); + } + protected ProducerSettings ProducerSettings => BuildProducerSettings(); protected ProducerSettings BuildProducerSettings() { - return ProducerSettings.Create(Sys, null, null).WithBootstrapServers(Fixture.KafkaServer); + return ProducerSettings.Create(Sys, null, null).WithBootstrapServers(Fixture.BootstrapServer); } protected CommitterSettings CommitterSettings @@ -53,7 +119,7 @@ protected CommitterSettings CommitterSettings protected ConsumerSettings CreateConsumerSettings(string group) { return ConsumerSettings.Create(Sys, null, null) - .WithBootstrapServers(Fixture.KafkaServer) + .WithBootstrapServers(Fixture.BootstrapServer) .WithStopTimeout(TimeSpan.FromSeconds(1)) .WithProperty("auto.offset.reset", "earliest") .WithGroupId(group); @@ -62,7 +128,7 @@ protected ConsumerSettings CreateConsumerSettings(st protected ConsumerSettings CreateConsumerSettings(string group) { return ConsumerSettings.Create(Sys, null, null) - .WithBootstrapServers(Fixture.KafkaServer) + .WithBootstrapServers(Fixture.BootstrapServer) .WithStopTimeout(TimeSpan.FromSeconds(1)) .WithProperty("auto.offset.reset", "earliest") .WithGroupId(group); @@ -120,47 +186,35 @@ protected TResult AssertTaskCompletesWithin(TimeSpan timeout, Task) CreateExternalPlainSourceProbe(IActorRef consumer, IManualSubscription sub) { return KafkaConsumer .PlainExternalSource(consumer, sub, true) - .Select(c => c.Value) + .Select(c => c.Message.Value) .ToMaterialized(this.SinkProbe(), Keep.Both) .Run(Materializer); } + + private static Config Default() { var config = ConfigurationFactory.ParseString("akka.loglevel = DEBUG"); diff --git a/src/Akka.Streams.Kafka.Tests/RepeatAttribute.cs b/src/Akka.Streams.Kafka.Tests/RepeatAttribute.cs new file mode 100644 index 00000000..46e5ac93 --- /dev/null +++ b/src/Akka.Streams.Kafka.Tests/RepeatAttribute.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Xunit; +using Xunit.Sdk; + +namespace Akka.Streams.Kafka.Tests +{ + public sealed class RepeatAttribute : DataAttribute + { + private readonly int _count; + + public RepeatAttribute(int count) + { + const int minimumCount = 1; + if (count < minimumCount) + { + throw new ArgumentOutOfRangeException( + paramName: nameof(count), + message: "Repeat count must be greater than 0."); + } + _count = count; + } + + public override IEnumerable GetData(MethodInfo testMethod) + { + foreach (var iterationNumber in Enumerable.Range(start: 1, count: _count)) + { + yield return new object[] { iterationNumber }; + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs index 37440342..e26e53e2 100644 --- a/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs +++ b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs @@ -57,6 +57,16 @@ public ProducerRecord(string topic, int? partition, long? timestamp, Message {Key = key, Value = value}) + { + } + + public ProducerRecord(string topic, int? partition, K key, V value) + : this(topic, partition, null, new Message {Key = key, Value = value}) + { + } + /// /// ProducerRecord /// diff --git a/src/Akka.Streams.Kafka/Properties/FriendsOf.cs b/src/Akka.Streams.Kafka/Properties/FriendsOf.cs index c53baf03..41adb60a 100644 --- a/src/Akka.Streams.Kafka/Properties/FriendsOf.cs +++ b/src/Akka.Streams.Kafka/Properties/FriendsOf.cs @@ -1,3 +1,4 @@ using System.Runtime.CompilerServices; +[assembly: InternalsVisibleTo("Akka.Streams.Kafka.Testkit")] [assembly: InternalsVisibleTo("Akka.Streams.Kafka.Tests")] \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs index 93d3f1ab..d186e327 100644 --- a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs +++ b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Kafka.Internal; using Akka.Util.Internal; @@ -10,9 +11,29 @@ namespace Akka.Streams.Kafka.Settings { public sealed class ProducerSettings { - public ProducerSettings(ISerializer keySerializer, ISerializer valueSerializer, int parallelism, - string dispatcherId, TimeSpan flushTimeout, TimeSpan eosCommitInterval, - IImmutableDictionary properties) + [Obsolete("Use the ctor with enrichAsync and producerFactory instead")] + public ProducerSettings( + ISerializer keySerializer, + ISerializer valueSerializer, + int parallelism, + string dispatcherId, + TimeSpan flushTimeout, + TimeSpan eosCommitInterval, + IImmutableDictionary properties) + : this(keySerializer, valueSerializer, parallelism, dispatcherId, flushTimeout, flushTimeout, properties, null, null, true) + {} + + public ProducerSettings( + ISerializer keySerializer, + ISerializer valueSerializer, + int parallelism, + string dispatcherId, + TimeSpan flushTimeout, + TimeSpan eosCommitInterval, + IImmutableDictionary properties, + Func, Task>> enrichAsync, + Func, IProducer> producerFactory, + bool closeProducerOnStop) { KeySerializer = keySerializer; ValueSerializer = valueSerializer; @@ -21,8 +42,18 @@ public ProducerSettings(ISerializer keySerializer, ISerializer val FlushTimeout = flushTimeout; EosCommitInterval = eosCommitInterval; Properties = properties; + EnrichAsync = enrichAsync; + ProducerFactory = producerFactory; + CloseProducerOnStop = closeProducerOnStop; } + public Func, Task>> EnrichAsync { get; } + + [Obsolete("Use CreateKafkaProducer(), CreateKafkaProducerAsync(), or CreateKafkaProducerCompletionStage() to get a new Kafka IProducer")] + public Func, IProducer> ProducerFactory { get; } + + public bool CloseProducerOnStop { get; } + public ISerializer KeySerializer { get; } public ISerializer ValueSerializer { get; } public int Parallelism { get; } @@ -73,6 +104,18 @@ public ProducerSettings WithParallelism(int parallelism) => public ProducerSettings WithDispatcher(string dispatcherId) => Copy(dispatcherId: dispatcherId); + public ProducerSettings WithCloseProducerOnStop(bool closeProducerOnStop) => + Copy(closeProducerOnStop: closeProducerOnStop); + + public ProducerSettings WithEnrichAsync(Func, Task>> enrichAsync) => + Copy(enrichAsync: enrichAsync); + + public ProducerSettings WithProducerFactory(Func, IProducer> producerFactory) => + Copy(producerFactory: producerFactory); + + public ProducerSettings WithProducer(IProducer producer) => + Copy(producerFactory: _ => producer, closeProducerOnStop: false ); + private ProducerSettings Copy( ISerializer keySerializer = null, ISerializer valueSerializer = null, @@ -80,22 +123,28 @@ private ProducerSettings Copy( string dispatcherId = null, TimeSpan? flushTimeout = null, TimeSpan? eosCommitInterval = null, - IImmutableDictionary properties = null) => + IImmutableDictionary properties = null, + Func, Task>> enrichAsync = null, + Func, IProducer> producerFactory = null, + bool? closeProducerOnStop = null) => new ProducerSettings( - keySerializer: keySerializer ?? this.KeySerializer, - valueSerializer: valueSerializer ?? this.ValueSerializer, - parallelism: parallelism ?? this.Parallelism, - dispatcherId: dispatcherId ?? this.DispatcherId, - flushTimeout: flushTimeout ?? this.FlushTimeout, - eosCommitInterval: eosCommitInterval ?? this.EosCommitInterval, - properties: properties ?? this.Properties); + keySerializer: keySerializer ?? KeySerializer, + valueSerializer: valueSerializer ?? ValueSerializer, + parallelism: parallelism ?? Parallelism, + dispatcherId: dispatcherId ?? DispatcherId, + flushTimeout: flushTimeout ?? FlushTimeout, + eosCommitInterval: eosCommitInterval ?? EosCommitInterval, + properties: properties ?? Properties, + enrichAsync: enrichAsync ?? EnrichAsync, + producerFactory: producerFactory ?? ProducerFactory, + closeProducerOnStop: closeProducerOnStop ?? CloseProducerOnStop); + public const string ConfigPath = "akka.kafka.producer"; + public static ProducerSettings Create(ActorSystem system, ISerializer keySerializer, ISerializer valueSerializer) { if (system == null) throw new ArgumentNullException(nameof(system)); - - var config = system.Settings.Config.GetConfig("akka.kafka.producer"); - return Create(config, keySerializer, valueSerializer); + return Create(system.Settings.Config.GetConfig(ConfigPath), keySerializer, valueSerializer); } public static ProducerSettings Create(Akka.Configuration.Config config, ISerializer keySerializer, ISerializer valueSerializer) @@ -111,7 +160,10 @@ public static ProducerSettings Create(Akka.Configuration.Config co dispatcherId: config.GetString("use-dispatcher", "akka.kafka.default-dispatcher"), flushTimeout: config.GetTimeSpan("flush-timeout", TimeSpan.FromSeconds(2)), eosCommitInterval: config.GetTimeSpan("eos-commit-interval", TimeSpan.FromMilliseconds(100)), - properties: properties); + properties: properties, + enrichAsync: null, + producerFactory: null, + closeProducerOnStop: config.GetBoolean("close-on-producer-stop", false)); } public Confluent.Kafka.IProducer CreateKafkaProducer(Action, Error> producerErrorHandler = null) diff --git a/src/Akka.Streams.Kafka/reference.conf b/src/Akka.Streams.Kafka/reference.conf index c8d3a22e..fe83c3ce 100644 --- a/src/Akka.Streams.Kafka/reference.conf +++ b/src/Akka.Streams.Kafka/reference.conf @@ -8,12 +8,20 @@ akka.kafka.producer { # How long to wait for `Producer.Flush` flush-timeout = 10s + # Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false + # when the producer instance is shared across multiple producer stages. + close-on-producer-stop = true + # Fully qualified config path which holds the dispatcher configuration # to be used by the producer stages. Some blocking may occur. # When this value is empty, the dispatcher configured for the stream # will be used. use-dispatcher = "akka.kafka.default-dispatcher" + # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow` + # for exactly-once-semantics processing. + eos-commit-interval = 100ms + # Properties defined by Confluent.Kafka.ProducerConfig # can be defined in this configuration section. kafka-clients { From 4b4bf39683bf7faaad8fba1cbb1b3848986e7568 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 30 Nov 2021 03:22:27 +0700 Subject: [PATCH 2/2] Use TCP socket to determine kafka container readiness --- src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs index 0b1f07ec..738c087b 100644 --- a/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs +++ b/src/Akka.Streams.Kafka.Testkit/Fixture/KafkaContainer.cs @@ -1,5 +1,7 @@ using System.Collections.Generic; using System.IO; +using System.Net; +using System.Net.Sockets; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -91,6 +93,7 @@ protected override Task SetupContainerParameters(CreateContainerParameters param protected override async Task WaitUntilReady(CancellationToken token) { + /* var regex = new Regex("\\[KafkaServer id=[0-9]*\\] started \\(kafka.server.KafkaServer\\)"); using (var stream = await Client.Containers.GetContainerLogsAsync(ContainerId, new ContainerLogsParameters { @@ -109,7 +112,7 @@ protected override async Task WaitUntilReady(CancellationToken token) } } } - /* + */ var address = IPAddress.Parse("127.0.0.1"); using (var socket = new TcpClient(AddressFamily.InterNetwork)) { @@ -126,7 +129,6 @@ protected override async Task WaitUntilReady(CancellationToken token) await Task.Delay(100, token); } } - */ } } } \ No newline at end of file