From b18ca44cf6af200dfae70a3fe7a1a3d849784888 Mon Sep 17 00:00:00 2001 From: Jason Roth Date: Thu, 14 May 2026 14:53:25 -0600 Subject: [PATCH] [FCP-4223] Use Endpoints API for flink endpoint list/use Replace the three-source CLI-side aggregation (ListFlinkRegions + ListNetworkPrivateLinkAttachments + ListNetworks + URL templating) with a single call to the dedicated Endpoints API via ccloud-sdk-go-v2/endpoint/v1, filtered by service=FLINK. This brings the CLI in line with the API the UI and Terraform already use, and surfaces endpoint shapes (notably multi-PLATT access-point URLs) that the legacy URL-template approach could not represent. To preserve the existing output contract: filter to endpoint_type=REST (drops LANGUAGE_SERVICE / flinkpls.* rows), restore the https:// scheme the legacy code produced, and uppercase the cloud column. Tests: new test-server mock for /endpoint/v1/endpoints, plus a new azure/italynorth region exercising a multi-PLATT GLB access-point URL. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 1 + go.sum | 2 + internal/flink/command_endpoint_list.go | 135 +++++------------- internal/flink/command_endpoint_list_test.go | 42 ++++++ internal/flink/command_endpoint_test.go | 101 ------------- internal/flink/command_endpoint_use.go | 87 ++++------- pkg/ccloudv2/client.go | 3 + pkg/ccloudv2/endpoint.go | 79 ++++++++++ .../endpoint/list-azure-access-point.golden | 4 + .../endpoint/use-azure-access-point.golden | 1 + .../flink/region/use-azure-italynorth.golden | 2 + test/flink_test.go | 4 + test/test-server/ccloudv2_router.go | 1 + test/test-server/endpoint_handlers.go | 89 ++++++++++++ test/test-server/fcpm_handlers.go | 11 ++ 15 files changed, 299 insertions(+), 263 deletions(-) create mode 100644 internal/flink/command_endpoint_list_test.go delete mode 100644 internal/flink/command_endpoint_test.go create mode 100644 pkg/ccloudv2/endpoint.go create mode 100644 test/fixtures/output/flink/endpoint/list-azure-access-point.golden create mode 100644 test/fixtures/output/flink/endpoint/use-azure-access-point.golden create mode 100644 test/fixtures/output/flink/region/use-azure-italynorth.golden create mode 100644 test/test-server/endpoint_handlers.go diff --git a/go.mod b/go.mod index 0b7635c561..b851fec66d 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0 github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 + github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0 github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.19.0 diff --git a/go.sum b/go.sum index 5261469a72..338d6f8fa3 100644 --- a/go.sum +++ b/go.sum @@ -216,6 +216,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9/go.mod h1:TtTcSfm+/JvnfqEKglOZ32LIcsRbdtrQdI+TtcP7fiU= +github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0 h1:Pd4oCibpSNfjyBNt9hOQv2EOHjKql1xc7hnU/qs7lvk= +github.com/confluentinc/ccloud-sdk-go-v2/endpoint v0.4.0/go.mod h1:xhkW77SQ3Dahj7/x05b8U5rhoI3sznz+oaZorAikCY4= github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6DtUlTjPjsuKiVSs1fvyP0nk= github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius= diff --git a/internal/flink/command_endpoint_list.go b/internal/flink/command_endpoint_list.go index 796d300a01..f430982789 100644 --- a/internal/flink/command_endpoint_list.go +++ b/internal/flink/command_endpoint_list.go @@ -7,20 +7,17 @@ import ( "github.com/spf13/cobra" - networkingprivatelinkv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-privatelink/v1" - networkingv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking/v1" - - pcloud "github.com/confluentinc/cli/v4/pkg/cloud" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/examples" "github.com/confluentinc/cli/v4/pkg/output" ) -type CloudRegionKey struct { - cloud string - region string -} +const ( + flinkEndpointService = "FLINK" + flinkRestEndpointType = "REST" + flinkEndpointUrlScheme = "https://" +) func (c *command) newEndpointListCommand() *cobra.Command { cmd := &cobra.Command{ @@ -42,7 +39,6 @@ func (c *command) newEndpointListCommand() *cobra.Command { } func (c *command) endpointList(cmd *cobra.Command, _ []string) error { - // Get the current Flink cloud and region cloud := c.Context.GetCurrentFlinkCloudProvider() region := c.Context.GetCurrentFlinkRegion() if cloud == "" || region == "" { @@ -51,81 +47,40 @@ func (c *command) endpointList(cmd *cobra.Command, _ []string) error { "Run `confluent flink region use --cloud --region ` to set the Flink cloud provider and region first.", ) } - cloud = strings.ToUpper(cloud) environmentId, err := c.Context.EnvironmentId() if err != nil { return err } - list := output.NewList(cmd) - flinkRegions, err := c.V2Client.ListFlinkRegions(cloud, region) + endpoints, err := c.V2Client.ListEndpoints(environmentId, cloud, region, flinkEndpointService, nil, "") if err != nil { - return fmt.Errorf("unable to list Flink endpoint, failed to list Flink regions: %w", err) + return fmt.Errorf("unable to list Flink endpoints: %w", err) } - results := make([]*flinkEndpointOut, 0, len(flinkRegions)*2) - // 1 - List all the public endpoints based optionally on cloud(upper case) and region(lower case) - for _, flinkRegion := range flinkRegions { - results = append(results, &flinkEndpointOut{ - IsCurrent: flinkRegion.GetHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(), - Endpoint: flinkRegion.GetHttpEndpoint(), - Cloud: flinkRegion.GetCloud(), - Region: flinkRegion.GetRegionName(), - Type: publicFlinkEndpointType, - }) - } - - // 2 - List all the private endpoints based on the presence of "READY" PrivateLinkAttachments as filter - // Note the `cloud` and `region` parameters have to be `nil` instead of empty slice in case of no filter - platts, err := c.V2Client.ListNetworkPrivateLinkAttachments(environmentId, nil, nil, nil, []string{"READY"}) - if err != nil { - return fmt.Errorf("unable to list Flink endpoint, failed to list private link attachments: %w", err) - } - - filterKeyMap := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts) - - for _, flinkRegion := range flinkRegions { - key := CloudRegionKey{ - cloud: flinkRegion.GetCloud(), - region: flinkRegion.GetRegionName(), - } - - if _, ok := filterKeyMap[key]; ok { - results = append(results, &flinkEndpointOut{ - IsCurrent: flinkRegion.GetPrivateHttpEndpoint() == c.Context.GetCurrentFlinkEndpoint(), - Endpoint: flinkRegion.GetPrivateHttpEndpoint(), - Cloud: flinkRegion.GetCloud(), - Region: flinkRegion.GetRegionName(), - Type: privateFlinkEndpointType, - }) + currentEndpoint := c.Context.GetCurrentFlinkEndpoint() + results := make([]*flinkEndpointOut, 0, len(endpoints)) + for _, e := range endpoints { + // The Endpoints API also returns LANGUAGE_SERVICE endpoints (`flinkpls.*`) used by + // the Cloud Console SQL editor's language server. Those are not usable by CLI + // commands like `flink shell`, `flink statement`, or `flink endpoint use`. + if e.GetEndpointType() != flinkRestEndpointType { + continue } - } - - // 3 - List all the CCN endpoint with the list of "READY" network domains - // Note the cloud and region have to be empty slice instead of `nil` in case of no filter - // These endpoints are currently only available for AWS and Azure, so we filter accordingly - var networks []networkingv1.NetworkingV1Network - if cloud != pcloud.Gcp { - networks, err = c.V2Client.ListNetworks(environmentId, nil, []string{cloud}, []string{region}, nil, []string{"READY"}, nil) - if err != nil { - return fmt.Errorf("unable to list Flink endpoint, failed to list networks: %w", err) + endpointType := publicFlinkEndpointType + if e.GetIsPrivate() { + endpointType = privateFlinkEndpointType } - } - - for _, network := range networks { - suffix := network.Status.GetEndpointSuffix() - endpoint := fmt.Sprintf("https://flink%s", suffix) + url := flinkEndpointUrl(e.GetEndpoint()) results = append(results, &flinkEndpointOut{ - IsCurrent: endpoint == c.Context.GetCurrentFlinkEndpoint(), - Endpoint: endpoint, - Cloud: network.Spec.GetCloud(), - Region: network.Spec.GetRegion(), - Type: privateFlinkEndpointType, + IsCurrent: url == currentEndpoint, + Endpoint: url, + Cloud: strings.ToUpper(e.GetCloud()), + Region: e.GetRegion(), + Type: endpointType, }) } - // Sort the results order by cloud, region, type and endpoint sort.Slice(results, func(i, j int) bool { if results[i].Cloud != results[j].Cloud { return results[i].Cloud < results[j].Cloud @@ -139,41 +94,21 @@ func (c *command) endpointList(cmd *cobra.Command, _ []string) error { return results[i].Endpoint < results[j].Endpoint }) - for _, result := range results { - list.Add(&flinkEndpointOut{ - IsCurrent: result.IsCurrent, - Endpoint: result.Endpoint, - Cloud: result.Cloud, - Region: result.Region, - Type: result.Type, - }) + list := output.NewList(cmd) + for _, r := range results { + list.Add(r) } - - // Disable the default sort to use the custom sort above list.Sort(false) return list.Print() } -// buildCloudRegionKeyFilterMapFromPrivateLinkAttachments creates a map of unique cloud/region pairs from PrivateLinkAttachments. -// This function helps deduplicate scenarios where users have multiple private link attachments in the same cloud region. -// Each unique combination of cloud and region is represented as a CloudRegionKey in the returned map. -// -// Parameters: -// - platts: A slice of NetworkingV1PrivateLinkAttachment objects to process -// -// Returns: -// - A map with CloudRegionKey as keys and boolean 'true' as values for each unique cloud/region combination -func buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment) map[CloudRegionKey]bool { - result := make(map[CloudRegionKey]bool, len(platts)) - for _, platt := range platts { - if platt.Spec.GetCloud() == "" || platt.Spec.GetRegion() == "" { - continue - } - compositeKey := CloudRegionKey{ - cloud: platt.Spec.GetCloud(), - region: platt.Spec.GetRegion(), - } - result[compositeKey] = true +// flinkEndpointUrl renders the bare FQDN the Endpoints API returns into the URL +// form the legacy command produced (and that `flink shell` / `flink endpoint use` +// expect). If the API response already includes a URL scheme — as the test server +// gateway does (`http://127.0.0.1:1026`) — it is preserved. +func flinkEndpointUrl(endpoint string) string { + if strings.Contains(endpoint, "://") { + return endpoint } - return result + return flinkEndpointUrlScheme + endpoint } diff --git a/internal/flink/command_endpoint_list_test.go b/internal/flink/command_endpoint_list_test.go new file mode 100644 index 0000000000..29071191d9 --- /dev/null +++ b/internal/flink/command_endpoint_list_test.go @@ -0,0 +1,42 @@ +package flink + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFlinkEndpointUrl(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "bare FQDN gets https:// prefix", + input: "flink.us-east-1.aws.confluent.cloud", + expected: "https://flink.us-east-1.aws.confluent.cloud", + }, + { + name: "multi-PLATT GLB access-point FQDN gets https:// prefix", + input: "flink-ap4l9zl0.ap-northeast-1.aws.accesspoint.glb.confluent.cloud", + expected: "https://flink-ap4l9zl0.ap-northeast-1.aws.accesspoint.glb.confluent.cloud", + }, + { + name: "value with https:// scheme is returned unchanged", + input: "https://flink.eu-west-1.aws.confluent.cloud", + expected: "https://flink.eu-west-1.aws.confluent.cloud", + }, + { + name: "value with http:// scheme is returned unchanged", + input: "http://127.0.0.1:1026", + expected: "http://127.0.0.1:1026", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, flinkEndpointUrl(tt.input)) + }) + } +} diff --git a/internal/flink/command_endpoint_test.go b/internal/flink/command_endpoint_test.go deleted file mode 100644 index fad4b4caba..0000000000 --- a/internal/flink/command_endpoint_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package flink - -import ( - "reflect" - "testing" - - networkingprivatelinkv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-privatelink/v1" -) - -func TestBuildCloudRegionKeyFilterMapFromPrivateLinkAttachments(t *testing.T) { - // Helper function to create a private link attachment with specified cloud and region - createPlatt := func(cloud, region string) networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment { - spec := networkingprivatelinkv1.NewNetworkingV1PrivateLinkAttachmentSpec() - spec.SetCloud(cloud) - spec.SetRegion(region) - - platt := networkingprivatelinkv1.NewNetworkingV1PrivateLinkAttachment() - platt.SetSpec(*spec) - return *platt - } - - tests := []struct { - name string - platts []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment - expected map[CloudRegionKey]bool - }{ - { - name: "Empty slice", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{}, - expected: map[CloudRegionKey]bool{}, - }, - { - name: "Single attachment", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{ - createPlatt("AWS", "us-east-1"), - }, - expected: map[CloudRegionKey]bool{ - {cloud: "AWS", region: "us-east-1"}: true, - }, - }, - { - name: "Multiple unique attachments", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{ - createPlatt("AWS", "us-east-1"), - createPlatt("GCP", "us-central1"), - createPlatt("AZURE", "eastus"), - }, - expected: map[CloudRegionKey]bool{ - {cloud: "AWS", region: "us-east-1"}: true, - {cloud: "GCP", region: "us-central1"}: true, - {cloud: "AZURE", region: "eastus"}: true, - }, - }, - { - name: "Duplicate cloud/region combinations", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{ - createPlatt("AWS", "us-east-1"), - createPlatt("AWS", "us-east-1"), - createPlatt("AWS", "us-west-1"), - }, - expected: map[CloudRegionKey]bool{ - {cloud: "AWS", region: "us-east-1"}: true, - {cloud: "AWS", region: "us-west-1"}: true, - }, - }, - { - name: "Empty cloud or region values are skipped", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{ - createPlatt("", "us-east-1"), - createPlatt("AWS", ""), - createPlatt("", ""), - createPlatt("GCP", "us-central1"), - }, - expected: map[CloudRegionKey]bool{ - {cloud: "GCP", region: "us-central1"}: true, - }, - }, - { - name: "Mix of valid and invalid entries", - platts: []networkingprivatelinkv1.NetworkingV1PrivateLinkAttachment{ - createPlatt("AWS", "us-east-1"), - createPlatt("", "eu-west-1"), - createPlatt("AZURE", "eastus"), - createPlatt("GCP", ""), - }, - expected: map[CloudRegionKey]bool{ - {cloud: "AWS", region: "us-east-1"}: true, - {cloud: "AZURE", region: "eastus"}: true, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(tt.platts) - if !reflect.DeepEqual(got, tt.expected) { - t.Errorf("buildCloudRegionKeyFilterMapFromPrivateLinkAttachments() = %v, expected %v", got, tt.expected) - } - }) - } -} diff --git a/internal/flink/command_endpoint_use.go b/internal/flink/command_endpoint_use.go index 06ba2d5007..0f2cced294 100644 --- a/internal/flink/command_endpoint_use.go +++ b/internal/flink/command_endpoint_use.go @@ -2,16 +2,11 @@ package flink import ( "fmt" - "strings" "github.com/spf13/cobra" - networkingv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking/v1" - - pcloud "github.com/confluentinc/cli/v4/pkg/cloud" "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/examples" - "github.com/confluentinc/cli/v4/pkg/log" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/resource" ) @@ -52,7 +47,11 @@ func (c *command) endpointUse(_ *cobra.Command, args []string) error { } endpoint := args[0] - if valid := validateUserProvidedFlinkEndpoint(endpoint, cloud, region, c); !valid { + valid, err := validateUserProvidedFlinkEndpoint(endpoint, cloud, region, c) + if err != nil { + return err + } + if !valid { suggestion := `Please run "confluent flink endpoint list" to see all available Flink endpoints, or "confluent flink region use" to switch to a different cloud or region.` return errors.NewErrorWithSuggestions(fmt.Sprintf("Flink endpoint %q is invalid for cloud = %q and region = %q", endpoint, cloud, region), suggestion) } @@ -68,72 +67,36 @@ func (c *command) endpointUse(_ *cobra.Command, args []string) error { return nil } -// validateUserProvidedFlinkEndpoint verifies if the provided Flink endpoint is valid for the given cloud and region. -// It performs validation against three endpoint types: -// 1. Public endpoints -// 2. Private endpoints associated with PrivateLink attachments -// 3. Private endpoints associated with Confluent Cloud Networks -// Returns true if the endpoint is valid, false otherwise. -func validateUserProvidedFlinkEndpoint(endpoint, cloud, region string, c *command) bool { +// validateUserProvidedFlinkEndpoint checks whether the given endpoint URL is one of +// the REST endpoints reported by the Endpoints API for the current environment + +// cloud + region. Returns (true, nil) on a match, (false, nil) if the endpoint is +// not in the returned set, or (false, err) if the environment lookup or API call +// fails — so the caller can surface a real error instead of a misleading +// "endpoint invalid" message. +func validateUserProvidedFlinkEndpoint(endpoint, cloud, region string, c *command) (bool, error) { if endpoint == "" { - log.CliLogger.Debug("Invalid input: given endpoint is empty") - return false + return false, nil } - cloud = strings.ToUpper(cloud) - // Check if the endpoint is PUBLIC - flinkRegions, err := c.V2Client.ListFlinkRegions(cloud, region) + environmentId, err := c.Context.EnvironmentId() if err != nil { - log.CliLogger.Debugf("Error listing Flink regions: %v", err) - return false - } - - for _, r := range flinkRegions { - if r.GetHttpEndpoint() == endpoint { - log.CliLogger.Debugf("Flink endpoint %q is a valid PUBLIC endpoint", endpoint) - return true - } + return false, err } - // Check if the endpoint is PRIVATE associated with PLATT - platts, err := c.V2Client.ListNetworkPrivateLinkAttachments(c.Context.GetCurrentEnvironment(), nil, nil, nil, []string{"READY"}) + endpoints, err := c.V2Client.ListEndpoints(environmentId, cloud, region, flinkEndpointService, nil, "") if err != nil { - log.CliLogger.Debugf("Error listing PrivateLink attachments: %v", err) - return false - } else { - filterKeyMap := buildCloudRegionKeyFilterMapFromPrivateLinkAttachments(platts) - - for _, r := range flinkRegions { - key := CloudRegionKey{ - cloud: r.GetCloud(), - region: r.GetRegionName(), - } - if _, ok := filterKeyMap[key]; ok && r.GetPrivateHttpEndpoint() == endpoint { - log.CliLogger.Debugf("Flink endpoint %q is a valid PRIVATE endpoint associated with a private link attachment", endpoint) - return true - } - } + return false, err } - // Check if the endpoint is PRIVATE associated with CCN - // These endpoints are currently only available for AWS and Azure, so we filter accordingly - var networks []networkingv1.NetworkingV1Network - if cloud != pcloud.Gcp { - networks, err = c.V2Client.ListNetworks(c.Context.GetCurrentEnvironment(), nil, []string{cloud}, []string{region}, nil, []string{"READY"}, nil) - if err != nil { - log.CliLogger.Debugf("Error listing networks: %v", err) - return false + for _, e := range endpoints { + // Skip LANGUAGE_SERVICE endpoints (`flinkpls.*`) — they are used by the Cloud + // Console SQL editor's language server, not by CLI dataplane commands. + if e.GetEndpointType() != flinkRestEndpointType { + continue } - } - - for _, network := range networks { - suffix := network.Status.GetEndpointSuffix() - validEndpoint := fmt.Sprintf("https://flink%s", suffix) - if endpoint == validEndpoint { - log.CliLogger.Debugf("Flink endpoint %q is a valid PRIVATE CCN endpoint", endpoint) - return true + if flinkEndpointUrl(e.GetEndpoint()) == endpoint { + return true, nil } } - - return false + return false, nil } diff --git a/pkg/ccloudv2/client.go b/pkg/ccloudv2/client.go index 8679db0b84..d5c6a062e0 100644 --- a/pkg/ccloudv2/client.go +++ b/pkg/ccloudv2/client.go @@ -14,6 +14,7 @@ import ( cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2" connectcustompluginv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin/v1" connectv1 "github.com/confluentinc/ccloud-sdk-go-v2/connect/v1" + endpointv1 "github.com/confluentinc/ccloud-sdk-go-v2/endpoint/v1" flinkartifactv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact/v1" flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2" iamIpFilter "github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering/v2" @@ -59,6 +60,7 @@ type Client struct { ConnectArtifactClient *camv1.APIClient ConnectClient *connectv1.APIClient ConnectCustomPluginClient *connectcustompluginv1.APIClient + EndpointClient *endpointv1.APIClient FlinkArtifactClient *flinkartifactv1.APIClient FlinkClient *flinkv2.APIClient IamClient *iamv2.APIClient @@ -111,6 +113,7 @@ func NewClient(cfg *config.Config, unsafeTrace bool) *Client { ConnectArtifactClient: newConnectArtifactClient(httpClient, url, userAgent, unsafeTrace), ConnectClient: newConnectClient(httpClient, url, userAgent, unsafeTrace), ConnectCustomPluginClient: newConnectCustomPluginClient(httpClient, url, userAgent, unsafeTrace), + EndpointClient: newEndpointClient(httpClient, url, userAgent, unsafeTrace), FlinkArtifactClient: newFlinkArtifactClient(httpClient, url, userAgent, unsafeTrace), FlinkClient: newFlinkClient(httpClient, url, userAgent, unsafeTrace), IamClient: newIamClient(httpClient, url, userAgent, unsafeTrace), diff --git a/pkg/ccloudv2/endpoint.go b/pkg/ccloudv2/endpoint.go new file mode 100644 index 0000000000..2d9c26b6ae --- /dev/null +++ b/pkg/ccloudv2/endpoint.go @@ -0,0 +1,79 @@ +package ccloudv2 + +import ( + "context" + "net/http" + "strings" + + endpointv1 "github.com/confluentinc/ccloud-sdk-go-v2/endpoint/v1" + + "github.com/confluentinc/cli/v4/pkg/errors" +) + +func newEndpointClient(httpClient *http.Client, url, userAgent string, unsafeTrace bool) *endpointv1.APIClient { + cfg := endpointv1.NewConfiguration() + cfg.Debug = unsafeTrace + cfg.HTTPClient = httpClient + cfg.Servers = endpointv1.ServerConfigurations{{URL: url}} + cfg.UserAgent = userAgent + + return endpointv1.NewAPIClient(cfg) +} + +func (c *Client) endpointApiContext() context.Context { + return context.WithValue(context.Background(), endpointv1.ContextAccessToken, c.cfg.Context().GetAuthToken()) +} + +// ListEndpoints returns all endpoints matching the given filters, paginating as +// needed. The `cloud` and `service` filters are normalized to uppercase to match +// the API's case-sensitive filter semantics (e.g. AWS, FLINK), so callers can +// pass either case without surprise. +func (c *Client) ListEndpoints(environment, cloud, region, service string, isPrivate *bool, resource string) ([]endpointv1.EndpointV1Endpoint, error) { + cloud = strings.ToUpper(cloud) + service = strings.ToUpper(service) + + var list []endpointv1.EndpointV1Endpoint + + done := false + pageToken := "" + for !done { + page, httpResp, err := c.executeListEndpoints(environment, pageToken, cloud, region, service, isPrivate, resource) + if err != nil { + return nil, errors.CatchCCloudV2Error(err, httpResp) + } + list = append(list, page.GetData()...) + + pageToken, done, err = extractNextPageToken(page.GetMetadata().Next) + if err != nil { + return nil, err + } + } + return list, nil +} + +func (c *Client) executeListEndpoints(environment, pageToken, cloud, region, service string, isPrivate *bool, resource string) (endpointv1.EndpointV1EndpointList, *http.Response, error) { + req := c.EndpointClient.EndpointsEndpointV1Api.ListEndpointV1Endpoints(c.endpointApiContext()). + Environment(environment). + PageSize(ccloudV2ListPageSize) + + if service != "" { + req = req.Service(service) + } + if cloud != "" { + req = req.Cloud(cloud) + } + if region != "" { + req = req.Region(region) + } + if isPrivate != nil { + req = req.IsPrivate(*isPrivate) + } + if resource != "" { + req = req.Resource(resource) + } + if pageToken != "" { + req = req.PageToken(pageToken) + } + + return req.Execute() +} diff --git a/test/fixtures/output/flink/endpoint/list-azure-access-point.golden b/test/fixtures/output/flink/endpoint/list-azure-access-point.golden new file mode 100644 index 0000000000..0919b79b35 --- /dev/null +++ b/test/fixtures/output/flink/endpoint/list-azure-access-point.golden @@ -0,0 +1,4 @@ + Current | Endpoint | Cloud | Region | Type +----------+-------------------------------------------------------------------------+-------+------------+---------- + | https://flink-ap4jnpj9.italynorth.azure.accesspoint.glb.confluent.cloud | AZURE | italynorth | PRIVATE + | https://flink.italynorth.azure.confluent.cloud | AZURE | italynorth | PUBLIC diff --git a/test/fixtures/output/flink/endpoint/use-azure-access-point.golden b/test/fixtures/output/flink/endpoint/use-azure-access-point.golden new file mode 100644 index 0000000000..df2c3f7189 --- /dev/null +++ b/test/fixtures/output/flink/endpoint/use-azure-access-point.golden @@ -0,0 +1 @@ +Using Flink endpoint "https://flink-ap4jnpj9.italynorth.azure.accesspoint.glb.confluent.cloud". diff --git a/test/fixtures/output/flink/region/use-azure-italynorth.golden b/test/fixtures/output/flink/region/use-azure-italynorth.golden new file mode 100644 index 0000000000..d94aea03c6 --- /dev/null +++ b/test/fixtures/output/flink/region/use-azure-italynorth.golden @@ -0,0 +1,2 @@ +The current Flink endpoint has been unset due to the cloud or region change. +Using Flink region "Milan (italynorth)". diff --git a/test/flink_test.go b/test/flink_test.go index a4f13ca38d..c07ef11c7f 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -503,6 +503,8 @@ func (s *CLITestSuite) TestFlinkEndpointList() { {args: "flink endpoint list", fixture: "flink/endpoint/list-azure.golden"}, {args: "flink region use --cloud azure --region eastus2", fixture: "flink/region/use-azure-ccn.golden"}, {args: "flink endpoint list", fixture: "flink/endpoint/list-azure-with-ccn.golden"}, + {args: "flink region use --cloud azure --region italynorth", fixture: "flink/region/use-azure-italynorth.golden"}, + {args: "flink endpoint list", fixture: "flink/endpoint/list-azure-access-point.golden"}, } for _, test := range tests { @@ -529,6 +531,8 @@ func (s *CLITestSuite) TestFlinkEndpointUse() { {args: "flink region use --cloud azure --region eastus2", fixture: "flink/region/use-azure-ccn.golden"}, {args: "flink endpoint use https://flink-n-abcde2.eastus.azure.confluent.cloud", fixture: "flink/endpoint/use-azure-ccn.golden"}, {args: "flink endpoint use https://flink-n-abcde7.eastus.azure.confluent.cloud", fixture: "flink/endpoint/use-azure-ccn-peering.golden"}, + {args: "flink region use --cloud azure --region italynorth", fixture: "flink/region/use-azure-italynorth.golden"}, + {args: "flink endpoint use https://flink-ap4jnpj9.italynorth.azure.accesspoint.glb.confluent.cloud", fixture: "flink/endpoint/use-azure-access-point.golden"}, } for _, test := range tests { diff --git a/test/test-server/ccloudv2_router.go b/test/test-server/ccloudv2_router.go index 002eaf9c99..8a7b2a7375 100644 --- a/test/test-server/ccloudv2_router.go +++ b/test/test-server/ccloudv2_router.go @@ -55,6 +55,7 @@ var ccloudV2Routes = []route{ {"/connect/v1/presigned-upload-url", handleCustomPluginUploadUrl}, {"/connect/v1/custom-connector-runtimes", handleListCustomConnectorRuntimes}, {"/connect/v1/dummy-presigned-url", handleCustomPluginUploadFile}, + {"/endpoint/v1/endpoints", handleEndpointV1Endpoints}, {"/fcpm/v2/compute-pools", handleFcpmComputePools}, {"/fcpm/v2/compute-pool-config", handleFcpmComputePoolConfigs}, {"/fcpm/v2/compute-pools/{id}", handleFcpmComputePoolsId}, diff --git a/test/test-server/endpoint_handlers.go b/test/test-server/endpoint_handlers.go new file mode 100644 index 0000000000..539e5b8525 --- /dev/null +++ b/test/test-server/endpoint_handlers.go @@ -0,0 +1,89 @@ +package testserver + +import ( + "encoding/json" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + + endpointv1 "github.com/confluentinc/ccloud-sdk-go-v2/endpoint/v1" +) + +// handleEndpointV1Endpoints mocks the new Endpoints API. +// For each supported (cloud, region) pair it returns the same set of FLINK endpoints +// the legacy three-source aggregation produced, so the list-*.golden fixtures +// remain unchanged. The handler also emits at least one LANGUAGE_SERVICE +// (`flinkpls.*`) endpoint to exercise the CLI's REST-only filter. +func handleEndpointV1Endpoints(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + // The real API returns cloud values lowercased; the CLI uppercases them for + // display. We match the real API here so the test exercises that conversion. + cloud := r.URL.Query().Get("cloud") + region := r.URL.Query().Get("region") + + var endpoints []endpointv1.EndpointV1Endpoint + switch { + case cloud == "AWS" && region == "eu-west-1": + endpoints = []endpointv1.EndpointV1Endpoint{ + newFlinkRestEndpoint("aws", region, TestFlinkGatewayUrl.String(), false), + newFlinkRestEndpoint("aws", region, TestFlinkGatewayUrlPrivate.String(), true), + newFlinkRestEndpoint("aws", region, "https://flink-n-abcde6.eu-west-1.aws.confluent.cloud", true), + // LANGUAGE_SERVICE endpoint must be filtered out by the CLI. + newFlinkLanguageServiceEndpoint("aws", region, "https://flinkpls.eu-west-1.aws.confluent.cloud", false), + } + case cloud == "AZURE" && region == "centralus": + endpoints = []endpointv1.EndpointV1Endpoint{ + newFlinkRestEndpoint("azure", region, TestFlinkGatewayUrl.String(), false), + } + case cloud == "AZURE" && region == "eastus2": + endpoints = []endpointv1.EndpointV1Endpoint{ + newFlinkRestEndpoint("azure", region, TestFlinkGatewayUrl.String(), false), + newFlinkRestEndpoint("azure", region, "https://flink-n-abcde2.eastus.azure.confluent.cloud", true), + newFlinkRestEndpoint("azure", region, "https://flink-n-abcde7.eastus.azure.confluent.cloud", true), + } + case cloud == "GCP" && region == "europe-west3-a": + endpoints = []endpointv1.EndpointV1Endpoint{ + newFlinkRestEndpoint("gcp", region, TestFlinkGatewayUrl.String(), false), + newFlinkRestEndpoint("gcp", region, TestFlinkGatewayUrlPrivate.String(), true), + } + case cloud == "AZURE" && region == "italynorth": + // Multi-PLATT (PrivateLink Gateway) shape: an access-point URL on the GLB + // domain that the legacy URL-template aggregation could not have constructed. + // Mirrors a real production env captured during FCP-4223 verification. + endpoints = []endpointv1.EndpointV1Endpoint{ + newFlinkRestEndpoint("azure", region, "https://flink.italynorth.azure.confluent.cloud", false), + newFlinkRestEndpoint("azure", region, "https://flink-ap4jnpj9.italynorth.azure.accesspoint.glb.confluent.cloud", true), + // LANGUAGE_SERVICE row that the CLI must filter out. + newFlinkLanguageServiceEndpoint("azure", region, "https://flinkpls.italynorth.azure.confluent.cloud", false), + } + } + + list := &endpointv1.EndpointV1EndpointList{Data: endpoints} + err := json.NewEncoder(w).Encode(list) + require.NoError(t, err) + } +} + +func newFlinkRestEndpoint(cloud, region, url string, isPrivate bool) endpointv1.EndpointV1Endpoint { + return newFlinkEndpoint(cloud, region, url, isPrivate, "REST") +} + +func newFlinkLanguageServiceEndpoint(cloud, region, url string, isPrivate bool) endpointv1.EndpointV1Endpoint { + return newFlinkEndpoint(cloud, region, url, isPrivate, "LANGUAGE_SERVICE") +} + +func newFlinkEndpoint(cloud, region, url string, isPrivate bool, endpointType string) endpointv1.EndpointV1Endpoint { + return endpointv1.EndpointV1Endpoint{ + Cloud: endpointv1.PtrString(cloud), + Region: endpointv1.PtrString(region), + Service: endpointv1.PtrString("FLINK"), + Endpoint: endpointv1.PtrString(url), + IsPrivate: endpointv1.PtrBool(isPrivate), + EndpointType: endpointv1.PtrString(endpointType), + } +} diff --git a/test/test-server/fcpm_handlers.go b/test/test-server/fcpm_handlers.go index 7a0edba26b..dacc817956 100644 --- a/test/test-server/fcpm_handlers.go +++ b/test/test-server/fcpm_handlers.go @@ -190,6 +190,14 @@ func handleFcpmRegions(t *testing.T) http.HandlerFunc { HttpEndpoint: flinkv2.PtrString(TestFlinkGatewayUrl.String()), PrivateHttpEndpoint: flinkv2.PtrString(TestFlinkGatewayUrlPrivate.String()), } + azureItalynorth := flinkv2.FcpmV2Region{ + Id: flinkv2.PtrString("azure.italynorth"), + DisplayName: flinkv2.PtrString("Milan (italynorth)"), + Cloud: flinkv2.PtrString("AZURE"), + RegionName: flinkv2.PtrString("italynorth"), + HttpEndpoint: flinkv2.PtrString(TestFlinkGatewayUrl.String()), + PrivateHttpEndpoint: flinkv2.PtrString(TestFlinkGatewayUrlPrivate.String()), + } // Allowing flexible mock results based on query parameters regions := []flinkv2.FcpmV2Region{awsEuWest1, awsEuWest2, gcp, azure} @@ -210,6 +218,9 @@ func handleFcpmRegions(t *testing.T) http.HandlerFunc { if r.URL.Query().Get("region_name") == "eastus2" { regions = []flinkv2.FcpmV2Region{azureEastus2} } + if r.URL.Query().Get("region_name") == "italynorth" { + regions = []flinkv2.FcpmV2Region{azureItalynorth} + } } regionsList := &flinkv2.FcpmV2RegionList{Data: regions} setPageToken(regionsList, ®ionsList.Metadata, r.URL)