8000 Retry on resource conflict error by sivanantha321 · Pull Request #3035 · kserve/kserve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Retry on resource conflict error #3035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 35 additions & 28 deletions pkg/controller/v1alpha1/inferencegraph/controller.go
10000
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/util/retry"

"github.com/go-logr/logr"
v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
Expand Down Expand Up @@ -207,36 +208,42 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

func (r *InferenceGraphReconciler) updateStatus(desiredGraph *v1alpha1api.InferenceGraph) error {
graph := &v1alpha1api.InferenceGraph{}
namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace}
if err := r.Get(context.TODO(), namespacedName, graph); err != nil {
return err
}
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
graph := &v1alpha1api.InferenceGraph{}
namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace}
if err := r.Get(context.TODO(), namespacedName, graph); err != nil {
return err
}

wasReady := inferenceGraphReadiness(graph.Status)
if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil {
r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name)
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err)
return errors.Wrapf(err, "fails to update InferenceGraph status")
} else {
r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name)
// If there was a difference and there was no error.
isReady := inferenceGraphReadiness(desiredGraph.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState),
fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState),
fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName()))
wasReady := inferenceGraphReadiness(graph.Status)
if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil {
if apierr.IsConflict(err) {
return err
}
r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name)
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err)
return errors.Wrapf(err, "fails to update InferenceGraph status")
} else {
r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name)
// If there was a difference and there was no error.
isReady := inferenceGraphReadiness(desiredGraph.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState),
fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState),
fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName()))
}
}
}
return nil
return nil
})
return err
}

func inferenceGraphReadiness(status v1alpha1api.InferenceGraphStatus) bool {
Expand Down
37 changes: 24 additions & 13 deletions pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,10 @@ func NewGraphKnativeServiceReconciler(client client.Client,
}
}

func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}
func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error {
// Return if no differences to reconcile.
if semanticEquals(desired, existing) {
retur 10000 n &existing.Status, nil
return nil
}

// Reconcile differences and update
Expand All @@ -86,14 +76,35 @@ func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus,
}
diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec)
if err != nil {
return &existing.Status, errors.Wrapf(err, "failed to diff inference graph knative service configuration spec")
return errors.Wrapf(err, "failed to diff inference graph knative service configuration spec")
}
log.Info("inference graph knative service configuration diff (-desired, +observed):", "diff", diff)
existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec
existing.ObjectMeta.Labels = desired.ObjectMeta.Labels
existing.Spec.Traffic = desired.Spec.Traffic
return nil
}

func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("Updating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil {
return err
}
if err := reconcileKsvc(desired, existing); err != nil {
return err
}
return r.client.Update(context.TODO(), existing)
})
if err != nil {
Expand Down
61 changes: 34 additions & 27 deletions pkg/controller/v1beta1/inferenceservice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inferenceservice
import (
"context"
"fmt"
"k8s.io/client-go/util/retry"
"reflect"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -237,34 +238,40 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

func (r *InferenceServiceReconciler) updateStatus(desiredService *v1beta1api.InferenceService, deploymentMode constants.DeploymentModeType) error {
existingService := &v1beta1api.InferenceService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingService); err != nil {
return err
}
wasReady := inferenceServiceReadiness(existingService.Status)
if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredService); err != nil {
r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name)
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceService %q: %v", desiredService.Name, err)
return errors.Wrapf(err, "fails to update InferenceService status")
} else {
// If there was a difference and there was no error.
isReady := inferenceServiceReadin EDBE ess(desiredService.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName()))
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
existingService := &v1beta1api.InferenceService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingService); err != nil {
return err
}
}
return nil
wasReady := inferenceServiceReadiness(existingService.Status)
if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredService); err != nil {
if apierr.IsConflict(err) {
return err
}
r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name)
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceService %q: %v", desiredService.Name, err)
return errors.Wrapf(err, "fails to update InferenceService status")
} else {
// If there was a difference and there was no error.
isReady := inferenceServiceReadiness(desiredService.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName()))
}
}
return nil
})
return err
}

func inferenceServiceReadiness(status v1beta1api.InferenceServiceStatus) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,10 @@ func createKnativeService(componentMeta metav1.ObjectMeta,
return service
}

func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
// Create service if does not exist
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}
func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error {
// Return if no differences to reconcile.
if semanticEquals(desired, existing) {
return &existing.Status, nil
return nil
}

// Reconcile differences and update
Expand All @@ -210,7 +199,7 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
1E0A }
diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec)
if err != nil {
return &existing.Status, errors.Wrapf(err, "failed to diff knative service configuration spec")
return errors.Wrapf(err, "failed to diff knative service configuration spec")
}
log.Info("knative service configuration diff (-desired, +observed):", "diff", diff)
existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec
Expand All @@ -223,8 +212,30 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
delete(existing.ObjectMeta.Annotations, ksvcAnnotationKey)
}
}
return nil
}

func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
// Create service if does not exist
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("Updating knative service", "namespace", desired.Namespace, "name", desired.Name)
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil {
return err
}
if err := reconcileKsvc(desired, existing); err != nil {
return err
}
return r.client.Update(context.TODO(), existing)
})
if err != nil {
Expand Down
0