Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 130 additions & 58 deletions internal/operator-controller/controllers/clusterobjectset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"pkg.package-operator.run/boxcutter"
"pkg.package-operator.run/boxcutter/machinery"
Expand All @@ -30,8 +31,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -81,29 +82,6 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
reconciledRev := existingRev.DeepCopy()
res, reconcileErr := c.reconcile(ctx, reconciledRev)

if pd := existingRev.Spec.ProgressDeadlineMinutes; pd > 0 {
cnd := meta.FindStatusCondition(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing)
isStillProgressing := cnd != nil && cnd.Status == metav1.ConditionTrue && cnd.Reason != ocv1.ReasonSucceeded
succeeded := meta.IsStatusConditionTrue(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded)
// check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet
if isStillProgressing && !succeeded {
timeout := time.Duration(pd) * time.Minute
if c.Clock.Since(existingRev.CreationTimestamp.Time) > timeout {
// progress deadline reached, reset any errors and stop reconciling this revision
markAsNotProgressing(reconciledRev, ocv1.ReasonProgressDeadlineExceeded, fmt.Sprintf("Revision has not rolled out for %d minute(s).", pd))
reconcileErr = nil
res = ctrl.Result{}
} else if reconcileErr == nil {
// We want to requeue so far in the future that the next reconciliation
// can detect if the revision did not progress within the given timeout.
// Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed.
drift := 2 * time.Second
requeueAfter := existingRev.CreationTimestamp.Time.Add(timeout).Add(drift).Sub(c.Clock.Now()).Round(time.Second)
l.Info(fmt.Sprintf("ProgressDeadline not exceeded, requeue after ~%v to check again.", requeueAfter))
res = ctrl.Result{RequeueAfter: requeueAfter}
}
}
}
// Do checks before any Update()s, as Update() may modify the resource structure!
updateStatus := !equality.Semantic.DeepEqual(existingRev.Status, reconciledRev.Status)

Expand Down Expand Up @@ -144,15 +122,18 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
return c.delete(ctx, cos)
}

remaining, hasDeadline := c.durationUntilDeadline(cos)
isDeadlineExceeded := hasDeadline && remaining <= 0

Comment on lines +125 to +127
phases, opts, err := c.buildBoxcutterPhases(ctx, cos)
if err != nil {
setRetryingConditions(cos, err.Error())
setRetryingConditions(cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("converting to boxcutter revision: %v", err)
}

revisionEngine, err := c.RevisionEngineFactory.CreateRevisionEngine(ctx, cos)
if err != nil {
setRetryingConditions(cos, err.Error())
setRetryingConditions(cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("failed to create revision engine: %v", err)
}

Expand All @@ -178,7 +159,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

if err := c.establishWatch(ctx, cos, revision); err != nil {
werr := fmt.Errorf("establish watch: %v", err)
setRetryingConditions(cos, werr.Error())
setRetryingConditions(cos, werr.Error(), isDeadlineExceeded)
return ctrl.Result{}, werr
}

Expand All @@ -188,22 +169,22 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
// Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity.
l.V(1).Info("reconcile report", "report", rres.String())
}
setRetryingConditions(cos, err.Error())
setRetryingConditions(cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("revision reconcile: %v", err)
}

// Retry failing preflight checks with a flat 10s retry.
// TODO: report status, backoff?
if verr := rres.GetValidationError(); verr != nil {
l.Error(fmt.Errorf("%w", verr), "preflight validation failed, retrying after 10s")
setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr))
setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

for i, pres := range rres.GetPhases() {
if verr := pres.GetValidationError(); verr != nil {
l.Error(fmt.Errorf("%w", verr), "phase preflight validation failed, retrying after 10s", "phase", i)
setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr))
setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

Expand All @@ -216,14 +197,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

if len(collidingObjs) > 0 {
l.Error(fmt.Errorf("object collision detected"), "object collision, retrying after 10s", "phase", i, "collisions", collidingObjs)
setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")))
setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
}

revVersion := cos.GetAnnotations()[labels.BundleVersionKey]
if rres.InTransition() {
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded)
}

//nolint:nestif
Expand All @@ -243,7 +224,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
}
}

markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion))
markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion), isDeadlineExceeded)
markAsAvailable(cos, ocv1.ClusterObjectSetReasonProbesSucceeded, "Objects are available and pass all probes.")

// We'll probably only want to remove this once we are done updating the ClusterExtension conditions
Expand Down Expand Up @@ -288,14 +269,23 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
} else {
markAsUnavailable(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
}
if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) == nil {
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
}
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded)
return c.requeueForDeadline(cos), nil
}

return ctrl.Result{}, nil
}

// requeueForDeadline returns a Result that requeues at the progress deadline
// if one is configured and has not yet been exceeded. This ensures that
// ProgressDeadlineExceeded is set promptly even when no object events occur.
func (c *ClusterObjectSetReconciler) requeueForDeadline(cos *ocv1.ClusterObjectSet) ctrl.Result {
if remaining, hasDeadline := c.durationUntilDeadline(cos); hasDeadline && remaining > 0 {
return ctrl.Result{RequeueAfter: remaining}
Comment on lines +279 to +284
}
return ctrl.Result{}
}

func (c *ClusterObjectSetReconciler) delete(ctx context.Context, cos *ocv1.ClusterObjectSet) (ctrl.Result, error) {
if err := c.TrackingCache.Free(ctx, cos); err != nil {
markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, err.Error())
Expand All @@ -311,11 +301,11 @@ func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine
tdres, err := revisionEngine.Teardown(ctx, revision)
if err != nil {
err = fmt.Errorf("error archiving revision: %v", err)
setRetryingConditions(cos, err.Error())
setRetryingConditions(cos, err.Error(), false)
return ctrl.Result{}, err
}
if tdres != nil && !tdres.IsComplete() {
setRetryingConditions(cos, "removing revision resources that are not owned by another revision")
setRetryingConditions(cos, "removing revision resources that are not owned by another revision", false)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
// Ensure conditions are set before removing the finalizer when archiving
Expand All @@ -333,29 +323,19 @@ type Sourcoser interface {
}

func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
skipProgressDeadlineExceededPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
rev, ok := e.ObjectNew.(*ocv1.ClusterObjectSet)
if !ok {
return true
}
// allow deletions to happen
if !rev.DeletionTimestamp.IsZero() {
return true
}
if cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing); cnd != nil && cnd.Status == metav1.ConditionFalse && cnd.Reason == ocv1.ReasonProgressDeadlineExceeded {
return false
}
return true
},
}
c.Clock = clock.RealClock{}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something I changed in this PR, so out-of-scope. Regardless, caller's don't call SetupWithManager in tests, so this isn't a problem in practice.

return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: newDeadlineAwareRateLimiter(
workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](),
mgr.GetClient(),
c.Clock,
),
}).
For(
&ocv1.ClusterObjectSet{},
builder.WithPredicates(
predicate.ResourceVersionChangedPredicate{},
skipProgressDeadlineExceededPredicate,
),
).
WatchesRawSource(
Expand All @@ -367,6 +347,51 @@ func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(c)
}

// deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff
// duration to the time remaining until the COS progress deadline (+2s), ensuring
// that ProgressDeadlineExceeded is set promptly even during exponential backoff.
type deadlineAwareRateLimiter struct {
delegate workqueue.TypedRateLimiter[ctrl.Request]
Comment on lines +350 to +354
client client.Reader
clock clock.Clock
}

func newDeadlineAwareRateLimiter(
delegate workqueue.TypedRateLimiter[ctrl.Request],
c client.Reader,
clk clock.Clock,
) *deadlineAwareRateLimiter {
return &deadlineAwareRateLimiter{delegate: delegate, client: c, clock: clk}
}

func (r *deadlineAwareRateLimiter) When(item ctrl.Request) time.Duration {
backoff := r.delegate.When(item)

cos := &ocv1.ClusterObjectSet{}
if err := r.client.Get(context.Background(), item.NamespacedName, cos); err != nil {
Comment on lines +370 to +371
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our use of the rate limiter uses the informer cache which is in-memory and does not block.

return backoff
}

remaining, hasDeadline := durationUntilDeadline(r.clock, cos)
if !hasDeadline {
return backoff
}

deadline := remaining + 2*time.Second
if deadline > 0 && deadline < backoff {
return deadline
}
return backoff
}

func (r *deadlineAwareRateLimiter) Forget(item ctrl.Request) {
r.delegate.Forget(item)
}

func (r *deadlineAwareRateLimiter) NumRequeues(item ctrl.Request) int {
return r.delegate.NumRequeues(item)
}

func (c *ClusterObjectSetReconciler) establishWatch(ctx context.Context, cos *ocv1.ClusterObjectSet, revision boxcutter.RevisionBuilder) error {
gvks := sets.New[schema.GroupVersionKind]()
for _, phase := range revision.GetPhases() {
Expand Down Expand Up @@ -631,14 +656,61 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing.
return userProbes, nil
}

func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string) {
markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message)
func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string, isDeadlineExceeded bool) {
markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message, isDeadlineExceeded)
if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeAvailable) != nil {
markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, message)
}
}

func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string) {
// durationUntilDeadline returns how much time is left before the progress deadline
// is exceeded. A negative duration means the deadline has already passed. If there
// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
// it returns -1 and false.
func (c *ClusterObjectSetReconciler) durationUntilDeadline(cos *ocv1.ClusterObjectSet) (time.Duration, bool) {
return durationUntilDeadline(c.Clock, cos)
}

// durationUntilDeadline returns how much time is left before the progress deadline
// is exceeded. A negative duration means the deadline has already passed. If there
// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
// it returns -1 and false.
func durationUntilDeadline(clk clock.Clock, cos *ocv1.ClusterObjectSet) (time.Duration, bool) {
pd := cos.Spec.ProgressDeadlineMinutes
if pd <= 0 {
return -1, false
}
// Succeeded is a latch β€” once set, it's never cleared. A revision that
// has already succeeded should not be blocked by the deadline, even if
// it temporarily goes back to InTransition (e.g., recovery after drift).
if meta.IsStatusConditionTrue(cos.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) {
return -1, false
Comment on lines +679 to +687
}
timeout := time.Duration(pd) * time.Minute
return timeout - clk.Since(cos.CreationTimestamp.Time), true
}

// markAsProgressing sets the Progressing condition to True with the given reason.
//
// For non-terminal reasons (RollingOut, Retrying), if isDeadlineExceeded is true,
// the condition is set to Progressing=False/ProgressDeadlineExceeded instead. This
// prevents a reconcile loop where RollingOut and ProgressDeadlineExceeded overwrite
// each other on every cycle.
//
// Terminal reasons (Succeeded) are always applied. Unregistered reasons panic.
func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string, isDeadlineExceeded bool) {
switch reason {
case ocv1.ReasonSucceeded:
// Terminal β€” always apply.
case ocv1.ReasonRollingOut, ocv1.ClusterObjectSetReasonRetrying:
if isDeadlineExceeded {
markAsNotProgressing(cos, ocv1.ReasonProgressDeadlineExceeded,
fmt.Sprintf("Revision has not rolled out for %d minute(s).", cos.Spec.ProgressDeadlineMinutes))
return
}
default:
panic(fmt.Sprintf("unregistered progressing reason: %q", reason))
}
meta.SetStatusCondition(&cos.Status.Conditions, metav1.Condition{
Type: ocv1.ClusterObjectSetTypeProgressing,
Status: metav1.ConditionTrue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) {
revisionResult: &mockRevisionResult{
inTransition: true,
},
reconcileResult: ctrl.Result{RequeueAfter: 62 * time.Second},
reconcileResult: ctrl.Result{RequeueAfter: 60 * time.Second},
validate: func(t *testing.T, c client.Client) {
rev := &ocv1.ClusterObjectSet{}
err := c.Get(t.Context(), client.ObjectKey{
Expand Down
Loading
Loading