Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/rbac_downstream/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,13 @@ rules:
- envoypatchpolicies/status
verbs:
- get
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- delete
- get
- patch
- update
206 changes: 184 additions & 22 deletions internal/controller/iroh_dns_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"
"strconv"
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -58,6 +59,74 @@ const (
connectorReasonIrohPending = "Pending"
)

// irohDNSClaimLeaseHeartbeats is the number of upstream Lease heartbeat
// intervals that must pass without renewal before a competitor may take over.
const irohDNSClaimLeaseHeartbeats = 3

type claimStatus int

const (
claimStatusCreate claimStatus = iota // no existing record
claimStatusOwner // we already hold the claim
claimStatusTakeOver // downstream claim, owner dead
claimStatusDefer // downstream claim, owner alive or unknown
)

// upstreamLeaseStatus is the result of checking the upstream Lease that
// connector_controller maintains for the foreign connector in its own cluster.
type upstreamLeaseStatus int

const (
upstreamLeaseUnknown upstreamLeaseStatus = iota // cluster not accessible
upstreamLeaseAlive // lease present and not expired
upstreamLeaseDead // lease absent or expired
)

// isLeaseExpired reports whether lease has passed its renewal deadline.
// Returns true (expired) conservatively when fields are absent.
func isLeaseExpired(lease coordinationv1.Lease, now time.Time) bool {
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return true
}
deadline := lease.Spec.RenewTime.Add(
time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second,
)
return !now.Before(deadline)
}

// evaluateClaim decides what applyClaim should do given the current state of
// the downstream DNSRecordSet, the foreign connector's upstream Lease, and the
// downstream claim Lease.
//
// Priority:
// - upstream lease (authoritative when accessible)
// - downstream lease (fallback for cross-cluster/cross-account)
func evaluateClaim(
connectorUID types.UID,
existing *dnsv1alpha1.DNSRecordSet,
upstream upstreamLeaseStatus,
claimLease *coordinationv1.Lease,
now time.Time,
) claimStatus {
if existing == nil {
return claimStatusCreate
}
if existing.Labels[irohDNSClaimedByUIDLabel] == string(connectorUID) {
return claimStatusOwner
}
switch upstream {
case upstreamLeaseAlive:
return claimStatusDefer
case upstreamLeaseDead:
return claimStatusTakeOver
}
// Upstream inaccessible: fall back to downstream Lease.
if claimLease != nil && isLeaseExpired(*claimLease, now) {
return claimStatusTakeOver
}
return claimStatusDefer
}

// allowedIrohControllerNames is the set of ConnectorClass.spec.controllerName
// values for which we publish iroh DNS records. Both names refer to the same
// controller; "datum-connect" is the legacy name kept alive while older
Expand All @@ -84,6 +153,7 @@ type IrohDNSReconciler struct {
// +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors/status,verbs=update;patch
// +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors/finalizers,verbs=update
// +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectorclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update;patch;delete

func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx, "cluster", req.ClusterName)
Expand Down Expand Up @@ -139,47 +209,130 @@ func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Reque
return ctrl.Result{}, r.applyClaim(ctx, cl, &connector, desired)
}

// applyClaim implements the claim-then-write loop:
// - Get the DNSRecordSet at the deterministic z32-derived name.
// - Not found → Create with our claim. AlreadyExists means a sibling beat
// us; we re-fetch and continue.
// - Found, our claim → SSA refresh content.
// - Found, foreign claim → defer (no write) and surface a status
// condition naming the owner.
// applyClaim implements the ownership arbitration loop:
// - Not found → Create with our claim + create downstream Lease.
// - Found, our claim → SSA refresh + renew downstream Lease.
// - Found, foreign claim + owner confirmed dead → force-claim.
// - Found, foreign claim + owner alive or no signal → defer.
//
// An absent downstream Lease (upstream inaccessible) results in Defer, not
// TakeOver, so pre-protocol records are not immediately contested on upgrade.
func (r *IrohDNSReconciler) applyClaim(ctx context.Context, cl cluster.Cluster, connector *networkingv1alpha1.Connector, desired *dnsv1alpha1.DNSRecordSet) error {
key := client.ObjectKeyFromObject(desired)
now := time.Now()

var existing dnsv1alpha1.DNSRecordSet
err := r.Downstream.GetClient().Get(ctx, key, &existing)
existingErr := r.Downstream.GetClient().Get(ctx, key, &existing)
if existingErr != nil && !apierrors.IsNotFound(existingErr) {
return fmt.Errorf("get DNSRecordSet: %w", existingErr)
}
var existingPtr *dnsv1alpha1.DNSRecordSet
if existingErr == nil {
existingPtr = &existing
}

var claimLease *coordinationv1.Lease
var upstream upstreamLeaseStatus
if existingPtr != nil && existingPtr.Labels[irohDNSClaimedByUIDLabel] != string(connector.UID) {
upstream = r.checkUpstreamClaimLease(ctx, existingPtr, now)
if upstream == upstreamLeaseUnknown {
// Upstream inaccessible — check downstream Lease as fallback.
var lease coordinationv1.Lease
err := r.Downstream.GetClient().Get(ctx, key, &lease)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("get downstream claim lease: %w", err)
}
if err == nil {
claimLease = &lease
}
}
}

switch {
case apierrors.IsNotFound(err):
switch evaluateClaim(connector.UID, existingPtr, upstream, claimLease, now) {
case claimStatusCreate:
if err := r.Downstream.GetClient().Create(ctx, desired); err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("create DNSRecordSet: %w", err)
}
// Sibling raced us. Refetch and fall through to the foreign-claim
// branch on next reconcile.
// Sibling raced us to the create; next reconcile will re-evaluate.
return nil
}
if err := r.renewDownstreamLease(ctx, connector, desired.Name); err != nil {
return err
}
return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.")

case err != nil:
return fmt.Errorf("get DNSRecordSet: %w", err)
}
case claimStatusOwner, claimStatusTakeOver:
if err := r.Downstream.GetClient().Patch(ctx, desired, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil {
return fmt.Errorf("apply DNSRecordSet: %w", err)
}
if err := r.renewDownstreamLease(ctx, connector, desired.Name); err != nil {
return err
}
return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.")

currentClaim := existing.Labels[irohDNSClaimedByUIDLabel]
if currentClaim != string(connector.UID) {
default: // claimStatusDefer
ownerCluster := decodeIrohClusterLabel(existing.Labels[irohDNSConnectorClusterLabel])
ownerRef := ownerCluster + "/" + existing.Labels[irohDNSConnectorNamespaceLabel] + "/" + existing.Labels[irohDNSConnectorNameLabel]
return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionFalse, connectorReasonIrohDeferredToOwner,
fmt.Sprintf("iroh DNS record is owned by Connector %s (uid %s).", ownerRef, currentClaim))
fmt.Sprintf("iroh DNS record is owned by Connector %s (uid %s).", ownerRef, existing.Labels[irohDNSClaimedByUIDLabel]))
}
}

// renewDownstreamLease creates or refreshes the claim Lease in the downstream
// cluster for the given DNSRecordSet name. The Lease is the liveness signal
// that lets competing connectors (including cross-cluster ones) determine
// whether the current claimant is still active.
func (r *IrohDNSReconciler) renewDownstreamLease(ctx context.Context, connector *networkingv1alpha1.Connector, recordSetName string) error {
dur := r.Config.Connector.LeaseDurationSeconds * irohDNSClaimLeaseHeartbeats
now := metav1.NewMicroTime(time.Now())
holderIdentity := string(connector.UID)
lease := &coordinationv1.Lease{
TypeMeta: metav1.TypeMeta{
APIVersion: coordinationv1.SchemeGroupVersion.String(),
Kind: "Lease",
},
ObjectMeta: metav1.ObjectMeta{
Name: recordSetName,
Namespace: r.Config.Connector.Iroh.DNSZoneRef.Namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &holderIdentity,
LeaseDurationSeconds: &dur,
RenewTime: &now,
},
}
if err := r.Downstream.GetClient().Patch(ctx, lease, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil {
return fmt.Errorf("renew downstream claim lease: %w", err)
}
return nil
}

// We own it. SSA the desired content.
if err := r.Downstream.GetClient().Patch(ctx, desired, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil {
return fmt.Errorf("apply DNSRecordSet: %w", err)
// checkUpstreamClaimLease looks up the upstream Lease that connector_controller
// maintains for the foreign connector identified by existing's claim labels.
// Returns upstreamLeaseUnknown when the cluster is not registered with this
// manager (cross-account or otherwise inaccessible).
func (r *IrohDNSReconciler) checkUpstreamClaimLease(ctx context.Context, existing *dnsv1alpha1.DNSRecordSet, now time.Time) upstreamLeaseStatus {
clusterName := decodeIrohClusterLabel(existing.Labels[irohDNSConnectorClusterLabel])
cl, err := r.mgr.GetCluster(ctx, clusterName)
if err != nil {
return upstreamLeaseUnknown
}
key := client.ObjectKey{
Namespace: existing.Labels[irohDNSConnectorNamespaceLabel],
Name: existing.Labels[irohDNSConnectorNameLabel],
}
var lease coordinationv1.Lease
if err := cl.GetClient().Get(ctx, key, &lease); err != nil {
if apierrors.IsNotFound(err) {
return upstreamLeaseDead
}
return upstreamLeaseUnknown
}
if isLeaseExpired(lease, now) {
return upstreamLeaseDead
}
return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.")
return upstreamLeaseAlive
}

// handleDeletion releases the claim (if held) and removes the finalizer.
Expand Down Expand Up @@ -226,6 +379,15 @@ func (r *IrohDNSReconciler) releaseIfOwner(ctx context.Context, connector *netwo
if err := r.Downstream.GetClient().Delete(ctx, &existing); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("delete DNSRecordSet: %w", err)
}
// Release the downstream claim Lease only if we currently hold it.
var claimLease coordinationv1.Lease
if err := r.Downstream.GetClient().Get(ctx, key, &claimLease); err == nil {
if claimLease.Spec.HolderIdentity != nil && *claimLease.Spec.HolderIdentity == string(connector.UID) {
if err := r.Downstream.GetClient().Delete(ctx, &claimLease); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("delete downstream claim lease: %w", err)
}
}
}
return nil
}

Expand Down
Loading
Loading