Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content
return nil, err
}

err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil)
err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions internal/kafka/command_topic_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,17 +286,24 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
subject = schemaRegistryContext
}

log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

srAuth := serdes.SchemaRegistryAuth{ApiKey: srApiKey, ApiSecret: srApiSecret, Token: token}
valueSubject := resolveAssociatedValueSubject(valueFormat, srEndpoint, srClusterId, cluster.ID, topic, srAuth)

groupHandler := &GroupHandler{
SrClient: srClient,
SrApiKey: srApiKey,
SrApiSecret: srApiSecret,
SrClusterId: srClusterId,
SrClusterEndpoint: srEndpoint,
KafkaClusterId: cluster.ID,
Token: token,
KeyFormat: keyFormat,
ValueFormat: valueFormat,
Out: cmd.OutOrStdout(),
Subject: subject,
ValueSubject: valueSubject,
Topic: topic,
Properties: ConsumerProperties{
Delimiter: delimiter,
Expand Down
92 changes: 45 additions & 47 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
cmd.Flags().String("delimiter", ":", "The delimiter separating each key and value.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of configuration overrides ("key=value") for the producer client. For a full list, see https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html`)
pcmd.AddProducerConfigFileFlag(cmd)
cmd.Flags().String("schema-registry-endpoint", "", "Endpoint for Schema Registry cluster.")

Check failure on line 65 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "schema-registry-endpoint" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=f149c019-41d5-4b95-a803-e681550354b3&open=f149c019-41d5-4b95-a803-e681550354b3
cmd.Flags().StringSlice("headers", nil, `A comma-separated list of headers formatted as "key:value".`)
cmd.Flags().Bool("schema-id-header", false, "Serialize schema ID in the header instead of the message prefix.")

Expand Down Expand Up @@ -147,12 +147,14 @@
return err
}

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key")
log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID)
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,12 +209,12 @@
func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error {
topic := args[0]

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key")
keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "")
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "")
if err != nil {
return err
}
Expand Down Expand Up @@ -461,14 +463,12 @@
return "", "", errors.New(missingOrMalformedKeyErrorMsg)
}

func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 466 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 45 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=c9847f0f-9150-4c7d-8862-99711486b797&open=c9847f0f-9150-4c7d-8862-99711486b797
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
}

subject := topicNameStrategy(topic, mode)

// Deprecated
var schemaId optional.Int32
if mode == "value" && cmd.Flags().Changed("schema-id") {
Expand All @@ -491,6 +491,41 @@
schemaId = optional.NewInt32(int32(id))
}

srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}

// Resolve subject via SR associations, fall back to TopicNameStrategy on miss.
subject := resolveProduceSubject(srEndpoint, srClusterId, kafkaClusterId, topic, mode, srAuth)

var format string
referencePathMap := map[string]string{}
metaInfo := []byte{}
Expand Down Expand Up @@ -559,49 +594,12 @@
}
}

// Fetch the SR client endpoint during schema registration
srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}

var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}

// Initialize the serializer with the same SR endpoint during registration
// The associated schema ID is also required to initialize the serializer
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var parsedSchemaId = -1
if len(metaInfo) >= 5 {
parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5]))
}

var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil {
return nil, nil, err
}

Expand All @@ -613,7 +611,7 @@
return serializationProvider, metaInfo, nil
}

func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 614 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 34 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=cbd860c0-adb3-4bed-9865-c16f70479eb8&open=cbd860c0-adb3-4bed-9865-c16f70479eb8
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -732,7 +730,7 @@
ClientKeyPath: clientKeyPath,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth)
err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions internal/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
SrApiSecret string
SrClusterId string
SrClusterEndpoint string
KafkaClusterId string
Token string
CertificateAuthorityPath string
ClientCertPath string
Expand All @@ -66,6 +67,7 @@
ValueFormat string
Out io.Writer
Subject string
ValueSubject string // Protobuf value subject resolved from a topic association; empty => fall back to Subject
Topic string
Properties ConsumerProperties
}
Expand Down Expand Up @@ -225,7 +227,7 @@
}
}

func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error {

Check failure on line 230 in internal/kafka/confluent_kafka.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 24 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=9ce9e30e-d78c-47ed-99b9-8412237b7cf0&open=9ce9e30e-d78c-47ed-99b9-8412237b7cf0
srAuth := serdes.SchemaRegistryAuth{
ApiKey: h.SrApiKey,
ApiSecret: h.SrApiSecret,
Expand All @@ -241,7 +243,7 @@
return err
}

err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil)
err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil)
if err != nil {
return err
}
Expand All @@ -268,12 +270,16 @@
return err
}

err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil)
err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil)
if err != nil {
return err
}

if err := valueDeserializer.LoadSchema(h.Subject, h.Properties.SchemaPath, serde.ValueSerde, message); err != nil {
valueSubject := h.ValueSubject
if valueSubject == "" {
valueSubject = h.Subject
}
if err := valueDeserializer.LoadSchema(valueSubject, h.Properties.SchemaPath, serde.ValueSerde, message); err != nil {
return err
}

Expand Down
34 changes: 34 additions & 0 deletions internal/kafka/confluent_kafka_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"bytes"
"testing"
"time"

Expand All @@ -11,6 +12,39 @@ import (
"github.com/confluentinc/cli/v4/pkg/serdes"
)

func TestConsumeMessage_ValueSubjectFallback(t *testing.T) {
tests := []struct {
name string
valueSubject string
}{
{"empty ValueSubject falls back to Subject", ""},
{"explicit ValueSubject is used", "topic1-value-context"},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
message := &ckgo.Message{
Key: []byte("my-key"),
Value: []byte("my-value"),
TopicPartition: ckgo.TopicPartition{Offset: 1, Partition: 0},
}
var buf bytes.Buffer
handler := &GroupHandler{
KeyFormat: "string",
ValueFormat: "string",
Subject: "topic1-value",
ValueSubject: test.valueSubject,
KafkaClusterId: "lkc-123",
Topic: "topic1",
Out: &buf,
Properties: ConsumerProperties{PrintKey: true, Delimiter: "\t"},
}
require.NoError(t, ConsumeMessage(message, handler))
require.Contains(t, buf.String(), "my-key")
require.Contains(t, buf.String(), "my-value")
})
}
}

func TestGetMessageString(t *testing.T) {
message := &ckgo.Message{
Value: []byte("message"),
Expand Down
83 changes: 83 additions & 0 deletions internal/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (

cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2"
cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
"github.com/confluentinc/cli/v4/pkg/ccstructs"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/kafkausagelimits"
"github.com/confluentinc/cli/v4/pkg/log"
"github.com/confluentinc/cli/v4/pkg/serdes"
)

func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData {
Expand Down Expand Up @@ -210,6 +213,86 @@ func topicNameStrategy(topic, mode string) string {
return fmt.Sprintf("%s-%s", topic, mode)
}

func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) {
var cfg *schemaregistry.Config
switch {
case srAuth.ApiKey != "" && srAuth.ApiSecret != "":
cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret)
case srAuth.Token != "":
cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "")
default:
cfg = schemaregistry.NewConfig(srClientUrl)
log.CliLogger.Info("initializing schema registry client with no authentication")
}
cfg.SslCaLocation = srAuth.CertificateAuthorityPath
cfg.SslCertificateLocation = srAuth.ClientCertPath
cfg.SslKeyLocation = srAuth.ClientKeyPath
return schemaregistry.NewClient(cfg)
}

// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id
// as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched.
func lookupAssociatedSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) (string, bool, error) {
associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1)
if err != nil {
return "", false, err
}
if len(associations) == 0 {
return "", false, nil
}
return associations[0].Subject, true, nil
}

func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string {
fallback := topic + "-" + mode
if kafkaClusterId == "" || client == nil {
return fallback
}
subject, found, err := lookupAssociatedSubject(client, kafkaClusterId, topic, mode)
if err != nil {
log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback)
return fallback
}
if !found {
log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback)
return fallback
}
log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", subject, topic, mode, kafkaClusterId)
return subject
}

func resolveProduceSubject(srEndpoint, srClusterId, kafkaClusterId, topic, mode string, srAuth serdes.SchemaRegistryAuth) string {
if kafkaClusterId != "" && srEndpoint != "" {
if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil {
return resolveSubject(client, kafkaClusterId, topic, mode)
}
}
return topicNameStrategy(topic, mode)
}

// The Protobuf deserializer fetches its schema by subject, so on an associated topic it needs the
// associated (context-qualified) value subject. Otherwise, valueSubject stays empty to preserve default behaviour
// and the deserializer falls back to the topic-derived subject
func resolveAssociatedValueSubject(valueFormat, srEndpoint, srClusterId, kafkaClusterId, topic string, srAuth serdes.SchemaRegistryAuth) string {
if !serdes.IsProtobufSchema(valueFormat) || kafkaClusterId == "" || srEndpoint == "" {
return ""
}
client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth)
if err != nil {
return ""
}
return associatedValueSubject(client, kafkaClusterId, topic)
}

func associatedValueSubject(client schemaregistry.Client, kafkaClusterId, topic string) string {
subject, found, err := lookupAssociatedSubject(client, kafkaClusterId, topic, "value")
if err != nil || !found {
return ""
}
log.CliLogger.Tracef("consumeCloud: resolved associated value subject %q", subject)
return subject
}

func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits {
if isDedicated(cluster) {
return usageLimits.GetCkuLimit(cluster.Status.GetCku())
Expand Down
Loading