diff --git a/internal/operator-controller/controllers/clusterobjectset_controller.go b/internal/operator-controller/controllers/clusterobjectset_controller.go index b34730b77..72f9265c4 100644 --- a/internal/operator-controller/controllers/clusterobjectset_controller.go +++ b/internal/operator-controller/controllers/clusterobjectset_controller.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" "pkg.package-operator.run/boxcutter" "pkg.package-operator.run/boxcutter/machinery" @@ -30,8 +31,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -81,29 +82,6 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req reconciledRev := existingRev.DeepCopy() res, reconcileErr := c.reconcile(ctx, reconciledRev) - if pd := existingRev.Spec.ProgressDeadlineMinutes; pd > 0 { - cnd := meta.FindStatusCondition(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) - isStillProgressing := cnd != nil && cnd.Status == metav1.ConditionTrue && cnd.Reason != ocv1.ReasonSucceeded - succeeded := meta.IsStatusConditionTrue(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) - // check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet - if isStillProgressing && !succeeded { - timeout := time.Duration(pd) * time.Minute - if c.Clock.Since(existingRev.CreationTimestamp.Time) > timeout { - // progress deadline reached, reset any errors and stop reconciling this revision - markAsNotProgressing(reconciledRev, ocv1.ReasonProgressDeadlineExceeded, fmt.Sprintf("Revision has not rolled out for %d minute(s).", pd)) - reconcileErr = nil - res = ctrl.Result{} - } else if reconcileErr == nil { - // We want to requeue so far in the future that the next reconciliation - // can detect if the revision did not progress within the given timeout. - // Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed. - drift := 2 * time.Second - requeueAfter := existingRev.CreationTimestamp.Time.Add(timeout).Add(drift).Sub(c.Clock.Now()).Round(time.Second) - l.Info(fmt.Sprintf("ProgressDeadline not exceeded, requeue after ~%v to check again.", requeueAfter)) - res = ctrl.Result{RequeueAfter: requeueAfter} - } - } - } // Do checks before any Update()s, as Update() may modify the resource structure! updateStatus := !equality.Semantic.DeepEqual(existingRev.Status, reconciledRev.Status) @@ -144,15 +122,18 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl return c.delete(ctx, cos) } + remaining, hasDeadline := c.durationUntilDeadline(cos) + isDeadlineExceeded := hasDeadline && remaining <= 0 + phases, opts, err := c.buildBoxcutterPhases(ctx, cos) if err != nil { - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("converting to boxcutter revision: %v", err) } revisionEngine, err := c.RevisionEngineFactory.CreateRevisionEngine(ctx, cos) if err != nil { - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("failed to create revision engine: %v", err) } @@ -178,7 +159,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl if err := c.establishWatch(ctx, cos, revision); err != nil { werr := fmt.Errorf("establish watch: %v", err) - setRetryingConditions(cos, werr.Error()) + setRetryingConditions(cos, werr.Error(), isDeadlineExceeded) return ctrl.Result{}, werr } @@ -188,7 +169,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl // Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity. l.V(1).Info("reconcile report", "report", rres.String()) } - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("revision reconcile: %v", err) } @@ -196,14 +177,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl // TODO: report status, backoff? if verr := rres.GetValidationError(); verr != nil { l.Error(fmt.Errorf("%w", verr), "preflight validation failed, retrying after 10s") - setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr)) + setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } for i, pres := range rres.GetPhases() { if verr := pres.GetValidationError(); verr != nil { l.Error(fmt.Errorf("%w", verr), "phase preflight validation failed, retrying after 10s", "phase", i) - setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr)) + setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } @@ -216,14 +197,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl if len(collidingObjs) > 0 { l.Error(fmt.Errorf("object collision detected"), "object collision, retrying after 10s", "phase", i, "collisions", collidingObjs) - setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n"))) + setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } } revVersion := cos.GetAnnotations()[labels.BundleVersionKey] if rres.InTransition() { - markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) + markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded) } //nolint:nestif @@ -243,7 +224,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl } } - markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion)) + markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion), isDeadlineExceeded) markAsAvailable(cos, ocv1.ClusterObjectSetReasonProbesSucceeded, "Objects are available and pass all probes.") // We'll probably only want to remove this once we are done updating the ClusterExtension conditions @@ -288,14 +269,23 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl } else { markAsUnavailable(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) } - if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) == nil { - markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) - } + markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded) + return c.requeueForDeadline(cos), nil } return ctrl.Result{}, nil } +// requeueForDeadline returns a Result that requeues at the progress deadline +// if one is configured and has not yet been exceeded. This ensures that +// ProgressDeadlineExceeded is set promptly even when no object events occur. +func (c *ClusterObjectSetReconciler) requeueForDeadline(cos *ocv1.ClusterObjectSet) ctrl.Result { + if remaining, hasDeadline := c.durationUntilDeadline(cos); hasDeadline && remaining > 0 { + return ctrl.Result{RequeueAfter: remaining} + } + return ctrl.Result{} +} + func (c *ClusterObjectSetReconciler) delete(ctx context.Context, cos *ocv1.ClusterObjectSet) (ctrl.Result, error) { if err := c.TrackingCache.Free(ctx, cos); err != nil { markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, err.Error()) @@ -311,11 +301,11 @@ func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine tdres, err := revisionEngine.Teardown(ctx, revision) if err != nil { err = fmt.Errorf("error archiving revision: %v", err) - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), false) return ctrl.Result{}, err } if tdres != nil && !tdres.IsComplete() { - setRetryingConditions(cos, "removing revision resources that are not owned by another revision") + setRetryingConditions(cos, "removing revision resources that are not owned by another revision", false) return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } // Ensure conditions are set before removing the finalizer when archiving @@ -333,29 +323,19 @@ type Sourcoser interface { } func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error { - skipProgressDeadlineExceededPredicate := predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - rev, ok := e.ObjectNew.(*ocv1.ClusterObjectSet) - if !ok { - return true - } - // allow deletions to happen - if !rev.DeletionTimestamp.IsZero() { - return true - } - if cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing); cnd != nil && cnd.Status == metav1.ConditionFalse && cnd.Reason == ocv1.ReasonProgressDeadlineExceeded { - return false - } - return true - }, - } c.Clock = clock.RealClock{} return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: newDeadlineAwareRateLimiter( + workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](), + mgr.GetClient(), + c.Clock, + ), + }). For( &ocv1.ClusterObjectSet{}, builder.WithPredicates( predicate.ResourceVersionChangedPredicate{}, - skipProgressDeadlineExceededPredicate, ), ). WatchesRawSource( @@ -367,6 +347,51 @@ func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(c) } +// deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff +// duration to the time remaining until the COS progress deadline (+2s), ensuring +// that ProgressDeadlineExceeded is set promptly even during exponential backoff. +type deadlineAwareRateLimiter struct { + delegate workqueue.TypedRateLimiter[ctrl.Request] + client client.Reader + clock clock.Clock +} + +func newDeadlineAwareRateLimiter( + delegate workqueue.TypedRateLimiter[ctrl.Request], + c client.Reader, + clk clock.Clock, +) *deadlineAwareRateLimiter { + return &deadlineAwareRateLimiter{delegate: delegate, client: c, clock: clk} +} + +func (r *deadlineAwareRateLimiter) When(item ctrl.Request) time.Duration { + backoff := r.delegate.When(item) + + cos := &ocv1.ClusterObjectSet{} + if err := r.client.Get(context.Background(), item.NamespacedName, cos); err != nil { + return backoff + } + + remaining, hasDeadline := durationUntilDeadline(r.clock, cos) + if !hasDeadline { + return backoff + } + + deadline := remaining + 2*time.Second + if deadline > 0 && deadline < backoff { + return deadline + } + return backoff +} + +func (r *deadlineAwareRateLimiter) Forget(item ctrl.Request) { + r.delegate.Forget(item) +} + +func (r *deadlineAwareRateLimiter) NumRequeues(item ctrl.Request) int { + return r.delegate.NumRequeues(item) +} + func (c *ClusterObjectSetReconciler) establishWatch(ctx context.Context, cos *ocv1.ClusterObjectSet, revision boxcutter.RevisionBuilder) error { gvks := sets.New[schema.GroupVersionKind]() for _, phase := range revision.GetPhases() { @@ -631,14 +656,61 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing. return userProbes, nil } -func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string) { - markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message) +func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string, isDeadlineExceeded bool) { + markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message, isDeadlineExceeded) if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeAvailable) != nil { markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, message) } } -func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string) { +// durationUntilDeadline returns how much time is left before the progress deadline +// is exceeded. A negative duration means the deadline has already passed. If there +// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded), +// it returns -1 and false. +func (c *ClusterObjectSetReconciler) durationUntilDeadline(cos *ocv1.ClusterObjectSet) (time.Duration, bool) { + return durationUntilDeadline(c.Clock, cos) +} + +// durationUntilDeadline returns how much time is left before the progress deadline +// is exceeded. A negative duration means the deadline has already passed. If there +// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded), +// it returns -1 and false. +func durationUntilDeadline(clk clock.Clock, cos *ocv1.ClusterObjectSet) (time.Duration, bool) { + pd := cos.Spec.ProgressDeadlineMinutes + if pd <= 0 { + return -1, false + } + // Succeeded is a latch — once set, it's never cleared. A revision that + // has already succeeded should not be blocked by the deadline, even if + // it temporarily goes back to InTransition (e.g., recovery after drift). + if meta.IsStatusConditionTrue(cos.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) { + return -1, false + } + timeout := time.Duration(pd) * time.Minute + return timeout - clk.Since(cos.CreationTimestamp.Time), true +} + +// markAsProgressing sets the Progressing condition to True with the given reason. +// +// For non-terminal reasons (RollingOut, Retrying), if isDeadlineExceeded is true, +// the condition is set to Progressing=False/ProgressDeadlineExceeded instead. This +// prevents a reconcile loop where RollingOut and ProgressDeadlineExceeded overwrite +// each other on every cycle. +// +// Terminal reasons (Succeeded) are always applied. Unregistered reasons panic. +func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string, isDeadlineExceeded bool) { + switch reason { + case ocv1.ReasonSucceeded: + // Terminal — always apply. + case ocv1.ReasonRollingOut, ocv1.ClusterObjectSetReasonRetrying: + if isDeadlineExceeded { + markAsNotProgressing(cos, ocv1.ReasonProgressDeadlineExceeded, + fmt.Sprintf("Revision has not rolled out for %d minute(s).", cos.Spec.ProgressDeadlineMinutes)) + return + } + default: + panic(fmt.Sprintf("unregistered progressing reason: %q", reason)) + } meta.SetStatusCondition(&cos.Status.Conditions, metav1.Condition{ Type: ocv1.ClusterObjectSetTypeProgressing, Status: metav1.ConditionTrue, diff --git a/internal/operator-controller/controllers/clusterobjectset_controller_test.go b/internal/operator-controller/controllers/clusterobjectset_controller_test.go index b3b10f575..b43a608e1 100644 --- a/internal/operator-controller/controllers/clusterobjectset_controller_test.go +++ b/internal/operator-controller/controllers/clusterobjectset_controller_test.go @@ -988,7 +988,7 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) { revisionResult: &mockRevisionResult{ inTransition: true, }, - reconcileResult: ctrl.Result{RequeueAfter: 62 * time.Second}, + reconcileResult: ctrl.Result{RequeueAfter: 60 * time.Second}, validate: func(t *testing.T, c client.Client) { rev := &ocv1.ClusterObjectSet{} err := c.Get(t.Context(), client.ObjectKey{ diff --git a/test/e2e/features/revision.feature b/test/e2e/features/revision.feature index dd6d9e940..2b1a19720 100644 --- a/test/e2e/features/revision.feature +++ b/test/e2e/features/revision.feature @@ -442,4 +442,71 @@ Feature: Install ClusterObjectSet Then ClusterObjectSet "${COS_NAME}" reports Progressing as True with Reason Succeeded And ClusterObjectSet "${COS_NAME}" reports Available as True with Reason ProbesSucceeded And resource "configmap/test-configmap-ref" is installed - And resource "deployment/test-httpd" is installed \ No newline at end of file + And resource "deployment/test-httpd" is installed + + Scenario: Archiving a COS with ProgressDeadlineExceeded cleans up its resources + Given min value for ClusterObjectSet .spec.progressDeadlineMinutes is set to 1 + And ServiceAccount "olm-sa" with needed permissions is available in test namespace + When ClusterObjectSet is applied + """ + apiVersion: olm.operatorframework.io/v1 + kind: ClusterObjectSet + metadata: + annotations: + olm.operatorframework.io/service-account-name: olm-sa + olm.operatorframework.io/service-account-namespace: ${TEST_NAMESPACE} + name: ${COS_NAME} + spec: + lifecycleState: Active + collisionProtection: Prevent + progressDeadlineMinutes: 1 + progressionProbes: + - selector: + type: GroupKind + groupKind: + group: apps + kind: Deployment + assertions: + - type: ConditionEqual + conditionEqual: + type: Available + status: "True" + phases: + - name: resources + objects: + - object: + apiVersion: v1 + kind: ConfigMap + metadata: + name: test-configmap + namespace: ${TEST_NAMESPACE} + data: + foo: bar + - object: + apiVersion: apps/v1 + kind: Deployment + metadata: + name: test-deployment + namespace: ${TEST_NAMESPACE} + spec: + replicas: 1 + selector: + matchLabels: + app: never-ready + template: + metadata: + labels: + app: never-ready + spec: + containers: + - name: never-ready + image: does-not-exist:latest + revision: 1 + """ + Then resource "configmap/test-configmap" is installed + And resource "deployment/test-deployment" is installed + And ClusterObjectSet "${COS_NAME}" reports Progressing as False with Reason ProgressDeadlineExceeded + When ClusterObjectSet "${COS_NAME}" lifecycle is set to "Archived" + Then ClusterObjectSet "${COS_NAME}" is archived + And resource "configmap/test-configmap" is eventually not found + And resource "deployment/test-deployment" is eventually not found diff --git a/test/e2e/steps/steps.go b/test/e2e/steps/steps.go index 2b5603ab7..3cf584054 100644 --- a/test/e2e/steps/steps.go +++ b/test/e2e/steps/steps.go @@ -77,6 +77,7 @@ func RegisterSteps(sc *godog.ScenarioContext) { sc.Step(`^(?i)ClusterExtension is applied(?:\s+.*)?$`, ResourceIsApplied) sc.Step(`^(?i)ClusterExtension is updated to version "([^"]+)"$`, ClusterExtensionVersionUpdate) sc.Step(`^(?i)ClusterExtension is updated(?:\s+.*)?$`, ResourceIsApplied) + sc.Step(`^(?i)ClusterObjectSet "([^"]+)" lifecycle is set to "([^"]+)"$`, ClusterObjectSetLifecycleUpdate) sc.Step(`^(?i)ClusterExtension is available$`, ClusterExtensionIsAvailable) sc.Step(`^(?i)ClusterExtension is rolled out$`, ClusterExtensionIsRolledOut) sc.Step(`^(?i)ClusterExtension resources are created and labeled$`, ClusterExtensionResourcesCreatedAndAreLabeled) @@ -293,6 +294,23 @@ func ClusterExtensionVersionUpdate(ctx context.Context, version string) error { return err } +// ClusterObjectSetLifecycleUpdate patches the ClusterObjectSet's lifecycleState to the specified value. +func ClusterObjectSetLifecycleUpdate(ctx context.Context, cosName, lifecycle string) error { + sc := scenarioCtx(ctx) + cosName = substituteScenarioVars(cosName, sc) + patch := map[string]any{ + "spec": map[string]any{ + "lifecycleState": lifecycle, + }, + } + pb, err := json.Marshal(patch) + if err != nil { + return err + } + _, err = k8sClient("patch", "clusterobjectset", cosName, "--type", "merge", "-p", string(pb)) + return err +} + // ResourceIsApplied applies the provided YAML resource to the cluster and in case of ClusterExtension or ClusterObjectSet it captures // its name in the test context so that it can be referred to in later steps with ${NAME} or ${COS_NAME}, respectively func ResourceIsApplied(ctx context.Context, yamlTemplate *godog.DocString) error {