Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9
github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9/go.mod h1:TtTcSfm+/JvnfqEKglOZ32LIcsRbdtrQdI+TtcP7fiU=
github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0 h1:Pd4oCibpSNfjyBNt9hOQv2EOHjKql1xc7hnU/qs7lvk=
github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0/go.mod h1:xhkW77SQ3Dahj7/x05b8U5rhoI3sznz+oaZorAikCY4=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6DtUlTjPjsuKiVSs1fvyP0nk=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius=
Expand Down
135 changes: 35 additions & 100 deletions internal/flink/command_endpoint_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ import (

"github.com/spf13/cobra"

networkingprivatelinkv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-privatelink/v1"
networkingv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking/v1"

pcloud "github.com/confluentinc/cli/v4/pkg/cloud"
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/output"
)

type CloudRegionKey struct {
cloud string
region string
}
const (
flinkEndpointService = "FLINK"
flinkRestEndpointType = "REST"
flinkEndpointUrlScheme = "https://"
)

func (c *command) newEndpointListCommand() *cobra.Command {
cmd := &cobra.Command{
Expand All @@ -42,7 +39,6 @@ func (c *command) newEndpointListCommand() *cobra.Command {
}

func (c *command) endpointList(cmd *cobra.Command, _ []string) error {
// Get the current Flink cloud and region
cloud := c.Context.GetCurrentFlinkCloudProvider()
region := c.Context.GetCurrentFlinkRegion()
if cloud == "" || region == "" {
Expand All @@ -51,81 +47,40 @@ func (c *command) endpointList(cmd *cobra.Command, _ []string) error {
"Run `confluent flink region use --cloud <cloud> --region <region>` to set the Flink cloud provider and region first.",
)
}
cloud = strings.ToUpper(cloud)

environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
}

list := output.NewList(cmd)
flinkRegions, err := c.V2Client.ListFlinkRegions(cloud, region)
endpoints, err := c.V2Client.ListEndpoints(environmentId, cloud, region, flinkEndpointService, nil, "")
if err != nil {
return fmt.Errorf("unable to list Flink endpoint, failed to list Flink regions: %w", err)
return fmt.Errorf("unable to list Flink endpoints: %w", err)
}
results := make([]*flinkEndpointOut, 0, len(flinkRegions)*2)

// 1 - List all the public endpoints based optionally on cloud(upper case) and region(lower case)
for _, flinkRegion := range flinkRegions {
results = append(results, &flinkEndpointOut{
IsCurrent: flinkRegion.GetHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(),
Endpoint: flinkRegion.GetHttpEndpoint(),
Cloud: flinkRegion.GetCloud(),
Region: flinkRegion.GetRegionName(),
Type: publicFlinkEndpointType,
})
}

// 2 - List all the private endpoints based on the presence of "READY" PrivateLinkAttachments as filter
// Note the `cloud` and `region` parameters have to be `nil` instead of empty slice in case of no filter
platts, err := c.V2Client.ListNetworkPrivateLinkAttachments(environmentId, nil, nil, nil, []string{"READY"})
if err != nil {
return fmt.Errorf("unable to list Flink endpoint, failed to list private link attachments: %w", err)
}

filterKeyMap := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts)

for _, flinkRegion := range flinkRegions {
key := CloudRegionKey{
cloud: flinkRegion.GetCloud(),
region: flinkRegion.GetRegionName(),
}

if _, ok := filterKeyMap[key]; ok {
results = append(results, &flinkEndpointOut{
IsCurrent: flinkRegion.GetPrivateHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(),
Endpoint: flinkRegion.GetPrivateHttpEndpoint(),
Cloud: flinkRegion.GetCloud(),
Region: flinkRegion.GetRegionName(),
Type: privateFlinkEndpointType,
})
currentEndpoint := c.Context.GetCurrentFlinkEndpoint()
results := make([]*flinkEndpointOut, 0, len(endpoints))
for _, e := range endpoints {
// The Endpoints API also returns LANGUAGE_SERVICE endpoints (`flinkpls.*`) used by
// the Cloud Console SQL editor's language server. Those are not usable by CLI
// commands like `flink shell`, `flink statement`, or `flink endpoint use`.
if e.GetEndpointType() != flinkRestEndpointType {
continue
}
}

// 3 - List all the CCN endpoint with the list of "READY" network domains
// Note the cloud and region have to be empty slice instead of `nil` in case of no filter
// These endpoints are currently only available for AWS and Azure, so we filter accordingly
var networks []networkingv1.NetworkingV1Network
if cloud != pcloud.Gcp {
networks, err = c.V2Client.ListNetworks(environmentId, nil, []string{cloud}, []string{region}, nil, []string{"READY"}, nil)
if err != nil {
return fmt.Errorf("unable to list Flink endpoint, failed to list networks: %w", err)
endpointType := publicFlinkEndpointType
if e.GetIsPrivate() {
endpointType = privateFlinkEndpointType
}
}

for _, network := range networks {
suffix := network.Status.GetEndpointSuffix()
endpoint := fmt.Sprintf("https://flink%s", suffix)
url := flinkEndpointUrl(e.GetEndpoint())
results = append(results, &flinkEndpointOut{
IsCurrent: endpoint == c.Context.GetCurrentFlinkEndpoint(),
Endpoint: endpoint,
Cloud: network.Spec.GetCloud(),
Region: network.Spec.GetRegion(),
Type: privateFlinkEndpointType,
IsCurrent: url == currentEndpoint,
Endpoint: url,
Cloud: strings.ToUpper(e.GetCloud()),
Region: e.GetRegion(),
Type: endpointType,
})
}

// Sort the results order by cloud, region, type and endpoint
sort.Slice(results, func(i, j int) bool {
if results[i].Cloud != results[j].Cloud {
return results[i].Cloud < results[j].Cloud
Expand All @@ -139,41 +94,21 @@ func (c *command) endpointList(cmd *cobra.Command, _ []string) error {
return results[i].Endpoint < results[j].Endpoint
})

for _, result := range results {
list.Add(&flinkEndpointOut{
IsCurrent: result.IsCurrent,
Endpoint: result.Endpoint,
Cloud: result.Cloud,
Region: result.Region,
Type: result.Type,
})
list := output.NewList(cmd)
for _, r := range results {
list.Add(r)
}

// Disable the default sort to use the custom sort above
list.Sort(false)
return list.Print()
}

// buildCloudRegionKeyFilterMapFromPrivateLinkAttachments creates a map of unique cloud/region pairs from PrivateLinkAttachments.
// This function helps deduplicate scenarios where users have multiple private link attachments in the same cloud region.
// Each unique combination of cloud and region is represented as a CloudRegionKey in the returned map.
//
// Parameters:
// - platts: A slice of NetworkingV1PrivateLinkAttachment objects to process
//
// Returns:
// - A map with CloudRegionKey as keys and boolean 'true' as values for each unique cloud/region combination
func buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment) map[CloudRegionKey]bool {
result := make(map[CloudRegionKey]bool, len(platts))
for _, platt := range platts {
if platt.Spec.GetCloud() == "" || platt.Spec.GetRegion() == "" {
continue
}
compositeKey := CloudRegionKey{
cloud: platt.Spec.GetCloud(),
region: platt.Spec.GetRegion(),
}
result[compositeKey] = true
// flinkEndpointUrl renders the bare FQDN the Endpoints API returns into the URL
// form the legacy command produced (and that `flink shell` / `flink endpoint use`
// expect). If the API response already includes a URL scheme — as the test server
// gateway does (`http://127.0.0.1:1026`) — it is preserved.
func flinkEndpointUrl(endpoint string) string {
if strings.Contains(endpoint, "://") {
return endpoint
}
return result
return flinkEndpointUrlScheme + endpoint
}
42 changes: 42 additions & 0 deletions internal/flink/command_endpoint_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package flink

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFlinkEndpointUrl(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "bare FQDN gets https:// prefix",
input: "flink.us-east-1.aws.confluent.cloud",
expected: "https://flink.us-east-1.aws.confluent.cloud",
},
{
name: "multi-PLATT GLB access-point FQDN gets https:// prefix",
input: "flink-ap4l9zl0.ap-northeast-1.aws.accesspoint.glb.confluent.cloud",
expected: "https://flink-ap4l9zl0.ap-northeast-1.aws.accesspoint.glb.confluent.cloud",
},
{
name: "value with https:// scheme is returned unchanged",
input: "https://flink.eu-west-1.aws.confluent.cloud",
expected: "https://flink.eu-west-1.aws.confluent.cloud",
},
{
name: "value with http:// scheme is returned unchanged",
input: "http://127.0.0.1:1026",
expected: "http://127.0.0.1:1026",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, flinkEndpointUrl(tt.input))
})
}
}
101 changes: 0 additions & 101 deletions internal/flink/command_endpoint_test.go

This file was deleted.

Loading