diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index 1b31206afe..20e2904de7 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content return nil, err } - err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil) + err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil) if err != nil { return nil, err } diff --git a/internal/kafka/command_topic_consume.go b/internal/kafka/command_topic_consume.go index b80faa9b59..fb20093336 100644 --- a/internal/kafka/command_topic_consume.go +++ b/internal/kafka/command_topic_consume.go @@ -286,17 +286,24 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error { subject = schemaRegistryContext } + log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + + srAuth := serdes.SchemaRegistryAuth{ApiKey: srApiKey, ApiSecret: srApiSecret, Token: token} + valueSubject := resolveAssociatedValueSubject(valueFormat, srEndpoint, srClusterId, cluster.ID, topic, srAuth) + groupHandler := &GroupHandler{ SrClient: srClient, SrApiKey: srApiKey, SrApiSecret: srApiSecret, SrClusterId: srClusterId, SrClusterEndpoint: srEndpoint, + KafkaClusterId: cluster.ID, Token: token, KeyFormat: keyFormat, ValueFormat: valueFormat, Out: cmd.OutOrStdout(), Subject: subject, + ValueSubject: valueSubject, Topic: topic, Properties: ConsumerProperties{ Delimiter: delimiter, diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index ec9c15ea98..3e60e61391 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -147,12 +147,14 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { return err } - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key") + log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic) + + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID) if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID) if err != nil { return err } @@ -207,12 +209,12 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { topic := args[0] - keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key") + keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "") if err != nil { return err } - valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value") + valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "") if err != nil { return err } @@ -461,14 +463,12 @@ func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, e return "", "", errors.New(missingOrMalformedKeyErrorMsg) } -func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err } - subject := topicNameStrategy(topic, mode) - // Deprecated var schemaId optional.Int32 if mode == "value" && cmd.Flags().Changed("schema-id") { @@ -491,6 +491,41 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( schemaId = optional.NewInt32(int32(id)) } + srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") + if err != nil { + return nil, nil, err + } + srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") + if err != nil { + return nil, nil, err + } + srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") + if err != nil { + return nil, nil, err + } + var token string + if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out + token, err = auth.GetDataplaneToken(c.Context) + if err != nil { + return nil, nil, err + } + } + var srClusterId string + if (schemaId.IsSet() || schema != "") && srEndpoint == "" { + srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) + if err != nil { + return nil, nil, err + } + } + srAuth := serdes.SchemaRegistryAuth{ + ApiKey: srApiKey, + ApiSecret: srApiSecret, + Token: token, + } + + // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. + subject := resolveProduceSubject(srEndpoint, srClusterId, kafkaClusterId, topic, mode, srAuth) + var format string referencePathMap := map[string]string{} metaInfo := []byte{} @@ -559,49 +594,12 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( } } - // Fetch the SR client endpoint during schema registration - srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint") - if err != nil { - return nil, nil, err - } - - var srClusterId string - if (schemaId.IsSet() || schema != "") && srEndpoint == "" { - srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd) - if err != nil { - return nil, nil, err - } - } - - // Initialize the serializer with the same SR endpoint during registration - // The associated schema ID is also required to initialize the serializer - srApiKey, err := cmd.Flags().GetString("schema-registry-api-key") - if err != nil { - return nil, nil, err - } - srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret") - if err != nil { - return nil, nil, err - } var parsedSchemaId = -1 if len(metaInfo) >= 5 { parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5])) } - var token string - if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out - token, err = auth.GetDataplaneToken(c.Context) - if err != nil { - return nil, nil, err - } - } - srAuth := serdes.SchemaRegistryAuth{ - ApiKey: srApiKey, - ApiSecret: srApiSecret, - Token: token, - } - err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth) - if err != nil { + if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil { return nil, nil, err } @@ -613,7 +611,7 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) ( return serializationProvider, metaInfo, nil } -func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) { +func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) { schemaDir, err := createTempDir() if err != nil { return nil, nil, err @@ -732,7 +730,7 @@ func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode str ClientKeyPath: clientKeyPath, Token: token, } - err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth) + err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth) if err != nil { return nil, nil, err } diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 2ebe6cfd6a..6e04e0993d 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -58,6 +58,7 @@ type GroupHandler struct { SrApiSecret string SrClusterId string SrClusterEndpoint string + KafkaClusterId string Token string CertificateAuthorityPath string ClientCertPath string @@ -66,6 +67,7 @@ type GroupHandler struct { ValueFormat string Out io.Writer Subject string + ValueSubject string // Protobuf value subject resolved from a topic association; empty => fall back to Subject Topic string Properties ConsumerProperties } @@ -241,7 +243,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil) + err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil) if err != nil { return err } @@ -268,12 +270,16 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil) + err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil) if err != nil { return err } - if err := valueDeserializer.LoadSchema(h.Subject, h.Properties.SchemaPath, serde.ValueSerde, message); err != nil { + valueSubject := h.ValueSubject + if valueSubject == "" { + valueSubject = h.Subject + } + if err := valueDeserializer.LoadSchema(valueSubject, h.Properties.SchemaPath, serde.ValueSerde, message); err != nil { return err } diff --git a/internal/kafka/confluent_kafka_test.go b/internal/kafka/confluent_kafka_test.go index c350a9e0b5..34bb2ab933 100644 --- a/internal/kafka/confluent_kafka_test.go +++ b/internal/kafka/confluent_kafka_test.go @@ -1,6 +1,7 @@ package kafka import ( + "bytes" "testing" "time" @@ -11,6 +12,39 @@ import ( "github.com/confluentinc/cli/v4/pkg/serdes" ) +func TestConsumeMessage_ValueSubjectFallback(t *testing.T) { + tests := []struct { + name string + valueSubject string + }{ + {"empty ValueSubject falls back to Subject", ""}, + {"explicit ValueSubject is used", "topic1-value-context"}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + message := &ckgo.Message{ + Key: []byte("my-key"), + Value: []byte("my-value"), + TopicPartition: ckgo.TopicPartition{Offset: 1, Partition: 0}, + } + var buf bytes.Buffer + handler := &GroupHandler{ + KeyFormat: "string", + ValueFormat: "string", + Subject: "topic1-value", + ValueSubject: test.valueSubject, + KafkaClusterId: "lkc-123", + Topic: "topic1", + Out: &buf, + Properties: ConsumerProperties{PrintKey: true, Delimiter: "\t"}, + } + require.NoError(t, ConsumeMessage(message, handler)) + require.Contains(t, buf.String(), "my-key") + require.Contains(t, buf.String(), "my-value") + }) + } +} + func TestGetMessageString(t *testing.T) { message := &ckgo.Message{ Value: []byte("message"), diff --git a/internal/kafka/utils.go b/internal/kafka/utils.go index 8df2dcdd23..4f0677fec5 100644 --- a/internal/kafka/utils.go +++ b/internal/kafka/utils.go @@ -6,12 +6,15 @@ import ( cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2" cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" "github.com/confluentinc/cli/v4/pkg/ccloudv2" "github.com/confluentinc/cli/v4/pkg/ccstructs" "github.com/confluentinc/cli/v4/pkg/kafkarest" "github.com/confluentinc/cli/v4/pkg/kafkausagelimits" + "github.com/confluentinc/cli/v4/pkg/log" + "github.com/confluentinc/cli/v4/pkg/serdes" ) func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData { @@ -210,6 +213,86 @@ func topicNameStrategy(topic, mode string) string { return fmt.Sprintf("%s-%s", topic, mode) } +func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) { + var cfg *schemaregistry.Config + switch { + case srAuth.ApiKey != "" && srAuth.ApiSecret != "": + cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) + case srAuth.Token != "": + cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") + default: + cfg = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing schema registry client with no authentication") + } + cfg.SslCaLocation = srAuth.CertificateAuthorityPath + cfg.SslCertificateLocation = srAuth.ClientCertPath + cfg.SslKeyLocation = srAuth.ClientKeyPath + return schemaregistry.NewClient(cfg) +} + +// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id +// as resource namespace. Falls backt o default TopicNameStrategy (-) if unmatched. +func lookupAssociatedSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) (string, bool, error) { + associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) + if err != nil { + return "", false, err + } + if len(associations) == 0 { + return "", false, nil + } + return associations[0].Subject, true, nil +} + +func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { + fallback := topic + "-" + mode + if kafkaClusterId == "" || client == nil { + return fallback + } + subject, found, err := lookupAssociatedSubject(client, kafkaClusterId, topic, mode) + if err != nil { + log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback) + return fallback + } + if !found { + log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback) + return fallback + } + log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", subject, topic, mode, kafkaClusterId) + return subject +} + +func resolveProduceSubject(srEndpoint, srClusterId, kafkaClusterId, topic, mode string, srAuth serdes.SchemaRegistryAuth) string { + if kafkaClusterId != "" && srEndpoint != "" { + if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { + return resolveSubject(client, kafkaClusterId, topic, mode) + } + } + return topicNameStrategy(topic, mode) +} + +// The Protobuf deserializer fetches its schema by subject, so on an associated topic it needs the +// associated (context-qualified) value subject. Otherwise, valueSubject stays empty to preserve default behaviour +// and the deserializer falls back to the topic-derived subject +func resolveAssociatedValueSubject(valueFormat, srEndpoint, srClusterId, kafkaClusterId, topic string, srAuth serdes.SchemaRegistryAuth) string { + if !serdes.IsProtobufSchema(valueFormat) || kafkaClusterId == "" || srEndpoint == "" { + return "" + } + client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth) + if err != nil { + return "" + } + return associatedValueSubject(client, kafkaClusterId, topic) +} + +func associatedValueSubject(client schemaregistry.Client, kafkaClusterId, topic string) string { + subject, found, err := lookupAssociatedSubject(client, kafkaClusterId, topic, "value") + if err != nil || !found { + return "" + } + log.CliLogger.Tracef("consumeCloud: resolved associated value subject %q", subject) + return subject +} + func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits { if isDedicated(cluster) { return usageLimits.GetCkuLimit(cluster.Status.GetCku()) diff --git a/internal/kafka/utils_test.go b/internal/kafka/utils_test.go new file mode 100644 index 0000000000..b8797040e1 --- /dev/null +++ b/internal/kafka/utils_test.go @@ -0,0 +1,159 @@ +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + + "github.com/confluentinc/cli/v4/pkg/serdes" +) + +const mockSRUrl = "mock://" + +func newMockSRClient(t *testing.T) schemaregistry.Client { + t.Helper() + client, err := schemaregistry.NewClient(schemaregistry.NewConfig(mockSRUrl)) + require.NoError(t, err) + return client +} + +func seedAssociation(t *testing.T, client schemaregistry.Client, kafkaClusterId, mode, subject string) { + t.Helper() + const topic = "topic1" + // The mock requires the subject to have a registered schema before it + // will accept an association referencing it. + _, err := client.Register(subject, schemaregistry.SchemaInfo{ + Schema: `{"type":"record","name":"R","fields":[{"name":"f","type":"int"}]}`, + SchemaType: "AVRO", + }, false) + require.NoError(t, err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: subject, + AssociationType: mode, + }}, + }) + require.NoError(t, err) +} + +func TestNewSchemaRegistryClient(t *testing.T) { + tests := []struct { + name string + auth serdes.SchemaRegistryAuth + }{ + {"basic authentication", serdes.SchemaRegistryAuth{ApiKey: "key", ApiSecret: "secret"}}, + {"bearer authentication", serdes.SchemaRegistryAuth{Token: "token"}}, + {"no authentication", serdes.SchemaRegistryAuth{}}, + {"with TLS paths", serdes.SchemaRegistryAuth{CertificateAuthorityPath: "ca", ClientCertPath: "cert", ClientKeyPath: "key"}}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + client, err := newSchemaRegistryClient(mockSRUrl, "lsrc-123", test.auth) + require.NoError(t, err) + require.NotNil(t, client) + }) + } +} + +func TestResolveProduceSubject(t *testing.T) { + auth := serdes.SchemaRegistryAuth{} + + t.Run("empty kafkaClusterId falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", resolveProduceSubject(mockSRUrl, "lsrc-1", "", "topic1", "value", auth)) + }) + + t.Run("empty srEndpoint falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", resolveProduceSubject("", "lsrc-1", "lkc-123", "topic1", "value", auth)) + }) + + t.Run("no association falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", resolveProduceSubject(mockSRUrl, "lsrc-1", "lkc-123", "topic1", "value", auth)) + }) + + t.Run("client build failure falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", resolveProduceSubject("://bad", "lsrc-1", "lkc-123", "topic1", "value", auth)) + }) +} + +func TestAssociatedValueSubject(t *testing.T) { + t.Run("no association returns empty", func(t *testing.T) { + client := newMockSRClient(t) + require.Empty(t, associatedValueSubject(client, "lkc-123", "topic1")) + }) + + t.Run("matching value association returns its subject", func(t *testing.T) { + client := newMockSRClient(t) + seedAssociation(t, client, "lkc-123", "value", "custom-value-subject") + require.Equal(t, "custom-value-subject", associatedValueSubject(client, "lkc-123", "topic1")) + }) +} + +func TestResolveAssociatedValueSubject(t *testing.T) { + auth := serdes.SchemaRegistryAuth{} + + t.Run("non-protobuf format returns empty", func(t *testing.T) { + require.Empty(t, resolveAssociatedValueSubject("avro", mockSRUrl, "lsrc-1", "lkc-123", "topic1", auth)) + }) + + t.Run("empty kafkaClusterId returns empty", func(t *testing.T) { + require.Empty(t, resolveAssociatedValueSubject("protobuf", mockSRUrl, "lsrc-1", "", "topic1", auth)) + }) + + t.Run("empty srEndpoint returns empty", func(t *testing.T) { + require.Empty(t, resolveAssociatedValueSubject("protobuf", "", "lsrc-1", "lkc-123", "topic1", auth)) + }) + + t.Run("protobuf with no association returns empty", func(t *testing.T) { + require.Empty(t, resolveAssociatedValueSubject("protobuf", mockSRUrl, "lsrc-1", "lkc-123", "topic1", auth)) + }) + + t.Run("client build failure returns empty", func(t *testing.T) { + require.Empty(t, resolveAssociatedValueSubject("protobuf", "://bad", "lsrc-1", "lkc-123", "topic1", auth)) + }) +} + +func TestResolveSubject(t *testing.T) { + t.Run("nil client falls back to TopicNameStrategy", func(t *testing.T) { + require.Equal(t, "topic1-value", resolveSubject(nil, "lkc-123", "topic1", "value")) + }) + + t.Run("empty kafkaClusterId falls back to TopicNameStrategy", func(t *testing.T) { + client := newMockSRClient(t) + require.Equal(t, "topic1-value", resolveSubject(client, "", "topic1", "value")) + }) + + t.Run("no association falls back to TopicNameStrategy", func(t *testing.T) { + client := newMockSRClient(t) + require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("matching association returns its subject", func(t *testing.T) { + client := newMockSRClient(t) + seedAssociation(t, client, "lkc-123", "value", "custom-value-subject") + require.Equal(t, "custom-value-subject", resolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("association for other mode falls back", func(t *testing.T) { + client := newMockSRClient(t) + seedAssociation(t, client, "lkc-123", "key", "custom-key-subject") + require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("association under different cluster id falls back", func(t *testing.T) { + client := newMockSRClient(t) + seedAssociation(t, client, "lkc-other", "value", "should-not-be-used") + require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value")) + }) + + t.Run("association lookup error falls back to TopicNameStrategy", func(t *testing.T) { + client := newMockSRClient(t) + // An empty topic makes the associations lookup return an error, exercising the error path. + require.Equal(t, "-value", resolveSubject(client, "lkc-123", "", "value")) + }) +} diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index 12b87ae022..e5e278f9eb 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -15,7 +15,7 @@ type AvroDeserializationProvider struct { deser *avrov3.Deserializer } -func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -34,6 +34,8 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index 0407adfeb6..96c1c90613 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -26,7 +26,7 @@ type AvroSerializationProvider struct { mode string } -func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -65,6 +65,8 @@ func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde @@ -98,8 +100,13 @@ func (a *AvroSerializationProvider) GetSchemaName() string { } func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { - // Step#1: Fetch the schemaInfo based on subject and schema ID - schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(topic+"-"+a.mode, a.schemaId) + // Step#1: Ask the configured ckgo strategy for the subject (AssociatedNameStrategy + // on cloud, TopicNameStrategy on on-prem), then fetch the schema by subject + id. + subject, err := a.ser.SubjectNameStrategy(topic, a.ser.SerdeType, schemaregistry.SchemaInfo{}) + if err != nil { + return nil, nil, fmt.Errorf("failed to resolve subject: %w", err) + } + schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(subject, a.schemaId) if err != nil { return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index d24f2d3e67..b1a3993bfd 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleDeserializationProvider struct{} -func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (DoubleDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/double_serialization_provider.go b/pkg/serdes/double_serialization_provider.go index e17c17da25..897d676519 100644 --- a/pkg/serdes/double_serialization_provider.go +++ b/pkg/serdes/double_serialization_provider.go @@ -12,7 +12,7 @@ import ( type DoubleSerializationProvider struct{} -func (DoubleSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (DoubleSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index 564b3ae724..2a26d2b550 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerDeserializationProvider struct{} -func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (IntegerDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/integer_serialization_provider.go b/pkg/serdes/integer_serialization_provider.go index 12b8de571b..0770292d29 100644 --- a/pkg/serdes/integer_serialization_provider.go +++ b/pkg/serdes/integer_serialization_provider.go @@ -11,7 +11,7 @@ import ( type IntegerSerializationProvider struct{} -func (IntegerSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (IntegerSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil } diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index b8ebe2d33c..88c117b4c5 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -15,7 +15,7 @@ type JsonDeserializationProvider struct { deser *jsonschema.Deserializer } -func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -36,6 +36,8 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 600fa6d810..fa86b67f35 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -23,7 +23,7 @@ type JsonSerializationProvider struct { ser *jsonschema.Serializer } -func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -63,6 +63,8 @@ func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mod serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 695f176465..61560f43de 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -25,7 +25,7 @@ type ProtobufDeserializationProvider struct { message gproto.Message } -func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { +func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache @@ -44,6 +44,8 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste } } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type switch mode { case "key": diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index 2f8e2ce45b..8272e9e09b 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -43,7 +43,7 @@ type ProtobufSerializationProvider struct { message gproto.Message } -func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { +func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) if err != nil { return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) @@ -82,6 +82,8 @@ func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, serdeConfig.UseLatestVersion = false } + serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId) + var serdeType serde.Type if mode == "key" { serdeType = serde.KeySerde diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index be1e0a4b3b..d2b7c4532b 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -68,7 +68,7 @@ type SchemaRegistryAuth struct { } type SerializationProvider interface { - InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error + InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error LoadSchema(string, map[string]string) error Serialize(string, string) ([]kafka.Header, []byte, error) GetSchemaName() string @@ -77,7 +77,7 @@ type SerializationProvider interface { } type DeserializationProvider interface { - InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error + InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error LoadSchema(string, string, serde.Type, *kafka.Message) error Deserialize(string, []kafka.Header, []byte) (string, error) GetSchemaRegistryClient() schemaregistry.Client @@ -140,6 +140,17 @@ func IsProtobufSchema(valueFormat string) bool { return valueFormat == protobufSchemaName } +// returns the SerDes subject name strategy + its config for a given Kafka cluster id. +// on-prem callers which pass "" stay on TopicNameStrategy. +func subjectStrategy(kafkaClusterId string) (serde.SubjectNameStrategyType, map[string]string) { + if kafkaClusterId != "" { + log.CliLogger.Tracef("subjectStrategy: AssociatedNameStrategy (kafkaClusterId=%q)", kafkaClusterId) + return serde.AssociatedNameStrategyType, map[string]string{serde.KafkaClusterIDConfig: kafkaClusterId} + } + log.CliLogger.Tracef("subjectStrategy: TopicNameStrategy (no kafkaClusterId)") + return serde.TopicNameStrategyType, nil +} + func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) (schemaregistry.Client, error) { if existingClient != nil { return existingClient, nil diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 049f72c725..5e7502260e 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -46,7 +46,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { // Basic Auth provider, err := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ ApiKey: "key", ApiSecret: "secret", }, nil) @@ -60,7 +60,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(jsonSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "value", SchemaRegistryAuth{Token: "token"}, nil) + err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "", "value", SchemaRegistryAuth{Token: "token"}, nil) req.Nil(err) config = provider.GetSchemaRegistryClient().Config() req.Equal(config.SchemaRegistryURL, mockClientUrl) @@ -72,7 +72,7 @@ func TestInitSchemaRegistryClient(t *testing.T) { serde.GlobalRuleRegistry().Clear() provider, err = GetDeserializationProvider(protobufSchemaName) req.Nil(err) - err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + err = provider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{ CertificateAuthorityPath: "ca.cert", ClientCertPath: "client.crt", ClientKeyPath: "client.key", @@ -116,6 +116,18 @@ func TestGetDeserializationProvider(t *testing.T) { require.Error(t, err) } +func TestPrimitiveSerdesInit(t *testing.T) { + for _, format := range []string{stringSchemaName, integerSchemaName, doubleSchemaName} { + serializationProvider, err := GetSerializationProvider(format) + require.NoError(t, err) + require.NoError(t, serializationProvider.InitSerializer("", "", "", "value", -1, SchemaRegistryAuth{})) + + deserializationProvider, err := GetDeserializationProvider(format) + require.NoError(t, err) + require.NoError(t, deserializationProvider.InitDeserializer("", "", "", "value", SchemaRegistryAuth{}, nil)) + } +} + func TestStringSerdes(t *testing.T) { req := require.New(t) @@ -146,7 +158,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -168,7 +180,7 @@ func TestAvroSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -189,7 +201,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -210,7 +222,7 @@ func TestAvroSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, data) @@ -228,7 +240,7 @@ func TestAvroSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -246,7 +258,7 @@ func TestAvroSerdesInvalid(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) req.Nil(err) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1"` @@ -276,7 +288,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -298,7 +310,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -320,7 +332,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -358,7 +370,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -380,7 +392,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(avroSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -418,7 +430,7 @@ func TestAvroSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -439,7 +451,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -461,7 +473,7 @@ func TestJsonSerdesValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -483,7 +495,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) @@ -504,7 +516,7 @@ func TestJsonSerdesValidWithHeaders(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", headers, expectedBytes) @@ -536,7 +548,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -566,7 +578,7 @@ func TestJsonSerdesReference(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) @@ -585,7 +597,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -601,7 +613,7 @@ func TestJsonSerdesInvalid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) brokenString := `{"f1":` @@ -638,7 +650,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -660,7 +672,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) @@ -681,7 +693,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -719,7 +731,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -741,7 +753,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock serializer and use latest schemaId serializationProvider, _ := GetSerializationProvider(jsonSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err := serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -779,7 +791,7 @@ func TestJsonSerdesValidWithCSPERuleSet(t *testing.T) { // Initialize the mock deserializer deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) @@ -808,7 +820,7 @@ func TestProtobufSerdesValid(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -826,7 +838,7 @@ func TestProtobufSerdesValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -855,7 +867,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) @@ -874,7 +886,7 @@ func TestProtobufSerdesValidWithHeaders(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{ Value: data, @@ -925,7 +937,7 @@ message Person { expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) req.Nil(err) @@ -957,7 +969,7 @@ message Person { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -993,7 +1005,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1012,7 +1024,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1064,7 +1076,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { expectedString := `{"name":"abc","id":2,"add":{"zip":"123","street":"def"},"phones":{"number":"234"}}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1082,7 +1094,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1119,7 +1131,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1156,7 +1168,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1190,7 +1202,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -1227,7 +1239,7 @@ func TestProtobufSerdesValidWithCSPERuleSet(t *testing.T) { req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "", "value", SchemaRegistryAuth{}, client) req.Nil(err) err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) @@ -1259,3 +1271,155 @@ func readSchemaReferences(references string) ([]schemaregistry.Reference, error) return refs, nil } + +// TestAvroSerdesUsesAssociatedSubject verifies the cloud path end-to-end: when +// InitSerializer receives a non-empty kafkaClusterId, ckgo's strategy resolves +// the subject via the associations API, and Serialize fetches the schema under +// the associated subject (not under -). +func TestAvroSerdesUsesAssociatedSubject(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` + schemaPath := filepath.Join(tempDir, "avro-schema-associated.txt") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-assoc-test" + topic = "associated-topic" + associated = "custom-associated-value" + ) + + serializationProvider, _ := GetSerializationProvider(avroSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + client := serializationProvider.GetSchemaRegistryClient() + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "AVRO"}, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, _, err = serializationProvider.Serialize(topic, `{"f1":123}`) + req.Nil(err, "serialize should resolve to the associated subject") + + // Confirm no schema lives under TopicNameStrategy: if Serialize had used the literal -, + // the test wouldn't be exercising the associated path. + _, err = client.GetLatestSchemaMetadata(topic + "-value") + req.Error(err) +} + +func TestJsonSerdesUsesAssociatedSubject(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"object","properties":{"f1":{"type":"string"}},"required":["f1"]}` + schemaPath := filepath.Join(tempDir, "json-schema-associated.json") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-assoc-json" + topic = "associated-topic-json" + associated = "custom-associated-json-value" + ) + + serializationProvider, _ := GetSerializationProvider(jsonSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + client := serializationProvider.GetSchemaRegistryClient() + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "JSON"}, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, _, err = serializationProvider.Serialize(topic, `{"f1":"asd"}`) + req.Nil(err, "serialize should resolve to the associated subject") + + _, err = client.GetLatestSchemaMetadata(topic + "-value") + req.Error(err) +} + +func TestProtobufSerdesUsesAssociatedSubject(t *testing.T) { + req := require.New(t) + + tempDir, err := os.MkdirTemp(tempDir, "protobuf-associated") + req.NoError(err) + defer os.RemoveAll(tempDir) + + schemaString := ` + syntax = "proto3"; + message Person { + string name = 1; + int32 page = 2; + }` + schemaPath := filepath.Join(tempDir, "person-schema.proto") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + const ( + kafkaClusterId = "lkc-assoc-proto" + topic = "associated-topic-proto" + associated = "custom-associated-proto-value" + ) + + serializationProvider, _ := GetSerializationProvider(protobufSchemaName) + err = serializationProvider.InitSerializer(mockClientUrl, "", kafkaClusterId, "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + client := serializationProvider.GetSchemaRegistryClient() + _, err = client.Register(associated, schemaregistry.SchemaInfo{Schema: schemaString, SchemaType: "PROTOBUF"}, false) + req.Nil(err) + _, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{ + ResourceName: topic, + ResourceNamespace: kafkaClusterId, + ResourceID: topic + ":" + kafkaClusterId, + ResourceType: "topic", + Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{ + Subject: associated, + AssociationType: "value", + }}, + }) + req.Nil(err) + + _, _, err = serializationProvider.Serialize(topic, `{"name":"abc","page":1}`) + req.Nil(err, "serialize should resolve to the associated subject") + + _, err = client.GetLatestSchemaMetadata(topic + "-value") + req.Error(err) +} + +func TestSubjectStrategy(t *testing.T) { + t.Run("non-empty cluster id selects AssociatedNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("lkc-test") + require.Equal(t, serde.AssociatedNameStrategyType, typ) + require.Equal(t, map[string]string{serde.KafkaClusterIDConfig: "lkc-test"}, cfg) + }) + + t.Run("empty cluster id selects TopicNameStrategy", func(t *testing.T) { + typ, cfg := subjectStrategy("") + require.Equal(t, serde.TopicNameStrategyType, typ) + require.Nil(t, cfg) + }) +} diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index 0d1891a424..b040183a1e 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -8,7 +8,7 @@ import ( type StringDeserializationProvider struct{} -func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { +func (s *StringDeserializationProvider) InitDeserializer(_, _, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } diff --git a/pkg/serdes/string_serialization_provider.go b/pkg/serdes/string_serialization_provider.go index a88000ed6a..8352937be4 100644 --- a/pkg/serdes/string_serialization_provider.go +++ b/pkg/serdes/string_serialization_provider.go @@ -8,7 +8,7 @@ import ( type StringSerializationProvider struct{} -func (s *StringSerializationProvider) InitSerializer(_, _, _ string, _ int, _ SchemaRegistryAuth) error { +func (s *StringSerializationProvider) InitSerializer(_, _, _, _ string, _ int, _ SchemaRegistryAuth) error { return nil }