diff --git a/controllers/clustersummary_controller.go b/controllers/clustersummary_controller.go index 44f5ddc0..24058321 100644 --- a/controllers/clustersummary_controller.go +++ b/controllers/clustersummary_controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/workqueue" clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" "sigs.k8s.io/cluster-api/util/annotations" ctrl "sigs.k8s.io/controller-runtime" @@ -95,6 +96,9 @@ const ( const ( clusterPausedMessage = "Cluster is paused" + + rateLimiterBaseDelay = 1 * time.Second + rateLimiterMaxDelay = 5 * time.Minute ) // ClusterSummaryReconciler reconciles a ClusterSummary object @@ -118,7 +122,8 @@ type ClusterSummaryReconciler struct { eventRecorder events.EventRecorder - DeletedInstances map[types.NamespacedName]time.Time + DeletedInstances map[types.NamespacedName]time.Time + NextReconcileTimes map[types.NamespacedName]time.Time // in-memory cooldown, survives status-patch conflicts } // If the drift-detection component is deployed in the management cluster, the addon-controller will deploy ResourceSummaries within the same cluster, @@ -178,7 +183,7 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque if r.skipReconciliation(clusterSummaryScope, req) { logger.V(logs.LogInfo).Info("ignore update") - return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil + return reconcile.Result{Requeue: true, RequeueAfter: r.remainingCooldown(clusterSummaryScope, req)}, nil } var isMatch bool @@ -205,9 +210,16 @@ func (r *ClusterSummaryReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Always close the scope when exiting this function so we can persist any ClusterSummary - // changes. + // changes. Conflict errors are swallowed because the watch event from whatever caused the + // conflict will re-enqueue this resource, and the next reconciliation will recompute status. + // Propagating the conflict would cause controller-runtime to immediately requeue, bypassing + // the intended NextReconcileTime backoff. defer func() { if err = clusterSummaryScope.Close(ctx); err != nil { + if apierrors.IsConflict(err) { + logger.V(logs.LogDebug).Info("conflict patching ClusterSummary status, will reconcile on next event") + return + } reterr = err } }() @@ -420,35 +432,50 @@ func (r *ClusterSummaryReconciler) reconcileNormal(ctx context.Context, clusterSummaryScope.ClusterSummary.Status.ReconciliationSuspended = false clusterSummaryScope.ClusterSummary.Status.SuspensionReason = nil - err = r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary) + if result := r.prepareForDeployment(ctx, clusterSummaryScope, logger); result.RequeueAfter > 0 { + return result, nil + } + + return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger) +} + +func (r *ClusterSummaryReconciler) prepareForDeployment(ctx context.Context, + clusterSummaryScope *scope.ClusterSummaryScope, logger logr.Logger) reconcile.Result { + + err := r.startWatcherForTemplateResourceRefs(ctx, clusterSummaryScope.ClusterSummary) if err != nil { logger.V(logs.LogInfo).Error(err, "failed to start watcher on resources referenced in TemplateResourceRefs.") - return reconcile.Result{Requeue: true, RequeueAfter: deleteRequeueAfter}, nil + r.setNextReconcileTime(clusterSummaryScope, deleteRequeueAfter) + return reconcile.Result{RequeueAfter: deleteRequeueAfter} } allDeployed, msg, err := r.areDependenciesDeployed(ctx, clusterSummaryScope, logger) if err != nil { - return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil + r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter) + return reconcile.Result{RequeueAfter: normalRequeueAfter} } clusterSummaryScope.SetDependenciesMessage(&msg) if !allDeployed { - return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil + r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter) + return reconcile.Result{RequeueAfter: normalRequeueAfter} } err = r.updateChartMap(ctx, clusterSummaryScope, logger) if err != nil { - return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil + r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter) + return reconcile.Result{RequeueAfter: normalRequeueAfter} } if !clusterSummaryScope.IsContinuousWithDriftDetection() { err = r.removeResourceSummary(ctx, clusterSummaryScope, logger) if err != nil { logger.V(logs.LogInfo).Error(err, "failed to remove ResourceSummary.") - return reconcile.Result{Requeue: true, RequeueAfter: normalRequeueAfter}, nil + r.setNextReconcileTime(clusterSummaryScope, normalRequeueAfter) + return reconcile.Result{RequeueAfter: normalRequeueAfter} } } - return r.proceedDeployingClusterSummary(ctx, clusterSummaryScope, logger) + return reconcile.Result{} } func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Context, @@ -460,6 +487,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co ok := errors.As(err, &conflictErr) if ok { logger.V(logs.LogInfo).Error(err, "failed to deploy because of conflict") + r.setNextReconcileTime(clusterSummaryScope, r.ConflictRetryTime) return reconcile.Result{Requeue: true, RequeueAfter: r.ConflictRetryTime}, nil } @@ -471,6 +499,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co "checkName", healthCheckError.CheckName, "reason", healthCheckError.InternalErr.Error(), "requeueAfter", r.HealthErrorRetryTime.String()) + r.setNextReconcileTime(clusterSummaryScope, r.HealthErrorRetryTime) return reconcile.Result{Requeue: true, RequeueAfter: r.HealthErrorRetryTime}, nil } @@ -513,6 +542,7 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co } logger.V(logs.LogInfo).Error(err, "failed to deploy") + r.setNextReconcileTime(clusterSummaryScope, requeueAfter) return reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}, nil } @@ -521,12 +551,59 @@ func (r *ClusterSummaryReconciler) proceedDeployingClusterSummary(ctx context.Co if clusterSummaryScope.IsDryRunSync() { r.resetFeatureStatusToProvisioning(clusterSummaryScope) // we need to keep retrying in DryRun ClusterSummaries + r.setNextReconcileTime(clusterSummaryScope, dryRunRequeueAfter) return reconcile.Result{Requeue: true, RequeueAfter: dryRunRequeueAfter}, nil } return reconcile.Result{}, nil } +// setNextReconcileTime sets NextReconcileTime on the ClusterSummary status +// so that skipReconciliation() can honor the intended backoff period +// even when a watch event re-enqueues the item before RequeueAfter fires. +// It also records the cooldown in the reconciler's in-memory map so that the +// guard works even if the status patch fails (e.g. due to a conflict). +func (r *ClusterSummaryReconciler) setNextReconcileTime( + clusterSummaryScope *scope.ClusterSummaryScope, d time.Duration) { + + nextTime := time.Now().Add(d) + clusterSummaryScope.ClusterSummary.Status.NextReconcileTime = + &metav1.Time{Time: nextTime} + + // Mirror in the in-memory map so skipReconciliation works even if scope.Close() + // encounters a conflict and the status field is never persisted. + key := types.NamespacedName{ + Namespace: clusterSummaryScope.ClusterSummary.Namespace, + Name: clusterSummaryScope.ClusterSummary.Name, + } + r.PolicyMux.Lock() + r.NextReconcileTimes[key] = nextTime + r.PolicyMux.Unlock() +} + +// remainingCooldown returns the time remaining before the next reconciliation +// should proceed, checking both the persisted status field and the in-memory map. +func (r *ClusterSummaryReconciler) remainingCooldown( + clusterSummaryScope *scope.ClusterSummaryScope, req ctrl.Request) time.Duration { + + requeueAfter := normalRequeueAfter + if nrt := clusterSummaryScope.ClusterSummary.Status.NextReconcileTime; nrt != nil { + if remaining := time.Until(nrt.Time); remaining > 0 { + requeueAfter = remaining + } + } + + r.PolicyMux.Lock() + if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok { + if remaining := time.Until(v); remaining > requeueAfter { + requeueAfter = remaining + } + } + r.PolicyMux.Unlock() + + return requeueAfter +} + // SetupWithManager sets up the controller with the Manager. func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { c, err := ctrl.NewControllerManagedBy(mgr). @@ -536,6 +613,10 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr ). WithOptions(controller.Options{ MaxConcurrentReconciles: r.ConcurrentReconciles, + RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request]( + rateLimiterBaseDelay, + rateLimiterMaxDelay, + ), }). Watches(&libsveltosv1beta1.SveltosCluster{}, handler.EnqueueRequestsFromMapFunc(r.requeueClusterSummaryForSveltosCluster), @@ -578,6 +659,7 @@ func (r *ClusterSummaryReconciler) SetupWithManager(ctx context.Context, mgr ctr initializeManager(ctrl.Log.WithName("watchers"), mgr.GetConfig(), mgr.GetClient()) r.DeletedInstances = make(map[types.NamespacedName]time.Time) + r.NextReconcileTimes = make(map[types.NamespacedName]time.Time) r.eventRecorder = mgr.GetEventRecorder("event-recorder") r.ctrl = c @@ -1642,10 +1724,19 @@ func (r *ClusterSummaryReconciler) skipReconciliation(clusterSummaryScope *scope } } - // Checking if reconciliation should happen - if cs.Status.NextReconcileTime != nil && time.Now().Before(cs.Status.NextReconcileTime.Time) { + // Checking if reconciliation should happen — check both the persisted status field + // and the in-memory map (which survives status-patch conflicts). + now := time.Now() + if cs.Status.NextReconcileTime != nil && now.Before(cs.Status.NextReconcileTime.Time) { return true } + if v, ok := r.NextReconcileTimes[req.NamespacedName]; ok { + if now.Before(v) { + return true + } + // Cooldown expired — remove from map + delete(r.NextReconcileTimes, req.NamespacedName) + } cs.Status.NextReconcileTime = nil diff --git a/controllers/clustersummary_deployer.go b/controllers/clustersummary_deployer.go index 377aec43..ed5b1c5f 100644 --- a/controllers/clustersummary_deployer.go +++ b/controllers/clustersummary_deployer.go @@ -159,6 +159,16 @@ func (r *ClusterSummaryReconciler) proceedDeployingFeature(ctx context.Context, return r.proceedDeployingFeatureInPullMode(ctx, clusterSummaryScope, f, isConfigSame, currentHash, logger) } + // Skip status update if already provisioned with the same hash — avoids + // unnecessary status patches that would trigger watch events and re-enqueue. + if existingFS := getFeatureSummaryForFeatureID(clusterSummary, f.id); existingFS != nil && + existingFS.Status == libsveltosv1beta1.FeatureStatusProvisioned && + reflect.DeepEqual(existingFS.Hash, currentHash) { + + logger.V(logs.LogDebug).Info("feature already provisioned with same hash, skipping status update") + return nil + } + r.updateFeatureStatus(clusterSummaryScope, f.id, deployerStatus, currentHash, deployerError, logger) message := fmt.Sprintf("Feature: %s deployed to cluster %s %s/%s", f.id, clusterSummary.Spec.ClusterType, clusterSummary.Spec.ClusterNamespace, clusterSummary.Spec.ClusterName) diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 34676500..52491a96 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -176,8 +176,9 @@ func getClusterSummaryReconciler(c client.Client, dep deployer.DeployerInterface Deployer: dep, ClusterMap: make(map[corev1.ObjectReference]*libsveltosset.Set), ReferenceMap: make(map[corev1.ObjectReference]*libsveltosset.Set), - DeletedInstances: make(map[types.NamespacedName]time.Time), - PolicyMux: sync.Mutex{}, + DeletedInstances: make(map[types.NamespacedName]time.Time), + NextReconcileTimes: make(map[types.NamespacedName]time.Time), + PolicyMux: sync.Mutex{}, } } diff --git a/controllers/export_test.go b/controllers/export_test.go index 7269be67..c81f85c3 100644 --- a/controllers/export_test.go +++ b/controllers/export_test.go @@ -88,6 +88,8 @@ var ( AddStageStatus = addStageStatus UpdateStageStatus = updateStageStatus GetMainDeploymentClusterProfileLabels = getMainDeploymentClusterProfileLabels + + SetNextReconcileTime = (*ClusterSummaryReconciler).setNextReconcileTime ) var (