From 88e12661bca2d9053702f5e73369973e4eaa9761 Mon Sep 17 00:00:00 2001 From: Sivanantham Chinnaiyan Date: Mon, 17 Jul 2023 15:45:31 +0530 Subject: [PATCH] Retry on resource conflict error Signed-off-by: Sivanantham Chinnaiyan --- .../v1alpha1/inferencegraph/controller.go | 63 ++++++++++--------- .../inferencegraph/knative_reconciler.go | 37 +++++++---- .../v1beta1/inferenceservice/controller.go | 61 ++++++++++-------- .../reconcilers/knative/ksvc_reconciler.go | 39 +++++++----- 4 files changed, 118 insertions(+), 82 deletions(-) diff --git a/pkg/controller/v1alpha1/inferencegraph/controller.go b/pkg/controller/v1alpha1/inferencegraph/controller.go index cc14c251ab8..860729df392 100644 --- a/pkg/controller/v1alpha1/inferencegraph/controller.go +++ b/pkg/controller/v1alpha1/inferencegraph/controller.go @@ -27,6 +27,7 @@ import ( "fmt" isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils" appsv1 "k8s.io/api/apps/v1" + "k8s.io/client-go/util/retry" "github.com/go-logr/logr" v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" @@ -207,36 +208,42 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque } func (r *InferenceGraphReconciler) updateStatus(desiredGraph *v1alpha1api.InferenceGraph) error { - graph := &v1alpha1api.InferenceGraph{} - namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace} - if err := r.Get(context.TODO(), namespacedName, graph); err != nil { - return err - } + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + graph := &v1alpha1api.InferenceGraph{} + namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace} + if err := r.Get(context.TODO(), namespacedName, graph); err != nil { + return err + } - wasReady := inferenceGraphReadiness(graph.Status) - if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil { - r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name) - r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed", - "Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err) - return errors.Wrapf(err, "fails to update InferenceGraph status") - } else { - r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name) - // If there was a difference and there was no error. - isReady := inferenceGraphReadiness(desiredGraph.Status) - if wasReady && !isReady { // Moved to NotReady State - r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState), - fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName())) - } else if !wasReady && isReady { // Moved to Ready State - r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState), - fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName())) + wasReady := inferenceGraphReadiness(graph.Status) + if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil { + if apierr.IsConflict(err) { + return err + } + r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name) + r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err) + return errors.Wrapf(err, "fails to update InferenceGraph status") + } else { + r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name) + // If there was a difference and there was no error. + isReady := inferenceGraphReadiness(desiredGraph.Status) + if wasReady && !isReady { // Moved to NotReady State + r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState), + fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName())) + } else if !wasReady && isReady { // Moved to Ready State + r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState), + fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName())) + } } - } - return nil + return nil + }) + return err } func inferenceGraphReadiness(status v1alpha1api.InferenceGraphStatus) bool { diff --git a/pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go b/pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go index 127154b6159..2eeaf6b372e 100644 --- a/pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go +++ b/pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go @@ -60,20 +60,10 @@ func NewGraphKnativeServiceReconciler(client client.Client, } } -func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { - desired := r.Service - existing := &knservingv1.Service{} - err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing) - if err != nil { - if apierr.IsNotFound(err) { - log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name) - return &desired.Status, r.client.Create(context.TODO(), desired) - } - return nil, err - } +func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error { // Return if no differences to reconcile. if semanticEquals(desired, existing) { - return &existing.Status, nil + return nil } // Reconcile differences and update @@ -86,14 +76,35 @@ func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, } diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec) if err != nil { - return &existing.Status, errors.Wrapf(err, "failed to diff inference graph knative service configuration spec") + return errors.Wrapf(err, "failed to diff inference graph knative service configuration spec") } log.Info("inference graph knative service configuration diff (-desired, +observed):", "diff", diff) existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec existing.ObjectMeta.Labels = desired.ObjectMeta.Labels existing.Spec.Traffic = desired.Spec.Traffic + return nil +} + +func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { + desired := r.Service + existing := &knservingv1.Service{} + err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing) + if err != nil { + if apierr.IsNotFound(err) { + log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name) + return &desired.Status, r.client.Create(context.TODO(), desired) + } + return nil, err + } + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { log.Info("Updating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name) + if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil { + return err + } + if err := reconcileKsvc(desired, existing); err != nil { + return err + } return r.client.Update(context.TODO(), existing) }) if err != nil { diff --git a/pkg/controller/v1beta1/inferenceservice/controller.go b/pkg/controller/v1beta1/inferenceservice/controller.go index a8508fdc424..785e72abfb3 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller.go +++ b/pkg/controller/v1beta1/inferenceservice/controller.go @@ -19,6 +19,7 @@ package inferenceservice import ( "context" "fmt" + "k8s.io/client-go/util/retry" "reflect" "github.com/go-logr/logr" @@ -237,34 +238,40 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req } func (r *InferenceServiceReconciler) updateStatus(desiredService *v1beta1api.InferenceService, deploymentMode constants.DeploymentModeType) error { - existingService := &v1beta1api.InferenceService{} - namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace} - if err := r.Get(context.TODO(), namespacedName, existingService); err != nil { - return err - } - wasReady := inferenceServiceReadiness(existingService.Status) - if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if err := r.Status().Update(context.TODO(), desiredService); err != nil { - r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name) - r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed", - "Failed to update status for InferenceService %q: %v", desiredService.Name, err) - return errors.Wrapf(err, "fails to update InferenceService status") - } else { - // If there was a difference and there was no error. - isReady := inferenceServiceReadiness(desiredService.Status) - if wasReady && !isReady { // Moved to NotReady State - r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState), - fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName())) - } else if !wasReady && isReady { // Moved to Ready State - r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState), - fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName())) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + existingService := &v1beta1api.InferenceService{} + namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace} + if err := r.Get(context.TODO(), namespacedName, existingService); err != nil { + return err } - } - return nil + wasReady := inferenceServiceReadiness(existingService.Status) + if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if err := r.Status().Update(context.TODO(), desiredService); err != nil { + if apierr.IsConflict(err) { + return err + } + r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name) + r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for InferenceService %q: %v", desiredService.Name, err) + return errors.Wrapf(err, "fails to update InferenceService status") + } else { + // If there was a difference and there was no error. + isReady := inferenceServiceReadiness(desiredService.Status) + if wasReady && !isReady { // Moved to NotReady State + r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState), + fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName())) + } else if !wasReady && isReady { // Moved to Ready State + r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState), + fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName())) + } + } + return nil + }) + return err } func inferenceServiceReadiness(status v1beta1api.InferenceServiceStatus) bool { diff --git a/pkg/controller/v1beta1/inferenceservice/reconcilers/knative/ksvc_reconciler.go b/pkg/controller/v1beta1/inferenceservice/reconcilers/knative/ksvc_reconciler.go index 57c3cec7c04..c56e9bb1acf 100644 --- a/pkg/controller/v1beta1/inferenceservice/reconcilers/knative/ksvc_reconciler.go +++ b/pkg/controller/v1beta1/inferenceservice/reconcilers/knative/ksvc_reconciler.go @@ -183,21 +183,10 @@ func createKnativeService(componentMeta metav1.ObjectMeta, return service } -func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { - // Create service if does not exist - desired := r.Service - existing := &knservingv1.Service{} - err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing) - if err != nil { - if apierr.IsNotFound(err) { - log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name) - return &desired.Status, r.client.Create(context.TODO(), desired) - } - return nil, err - } +func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error { // Return if no differences to reconcile. if semanticEquals(desired, existing) { - return &existing.Status, nil + return nil } // Reconcile differences and update @@ -210,7 +199,7 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { } diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec) if err != nil { - return &existing.Status, errors.Wrapf(err, "failed to diff knative service configuration spec") + return errors.Wrapf(err, "failed to diff knative service configuration spec") } log.Info("knative service configuration diff (-desired, +observed):", "diff", diff) existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec @@ -223,8 +212,30 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { delete(existing.ObjectMeta.Annotations, ksvcAnnotationKey) } } + return nil +} + +func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) { + // Create service if does not exist + desired := r.Service + existing := &knservingv1.Service{} + err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing) + if err != nil { + if apierr.IsNotFound(err) { + log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name) + return &desired.Status, r.client.Create(context.TODO(), desired) + } + return nil, err + } + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { log.Info("Updating knative service", "namespace", desired.Namespace, "name", desired.Name) + if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil { + return err + } + if err := reconcileKsvc(desired, existing); err != nil { + return err + } return r.client.Update(context.TODO(), existing) }) if err != nil {