From 635798671e8027c5d356316285fb35e54f4699f8 Mon Sep 17 00:00:00 2001 From: Robin Yue Date: Sun, 7 Apr 2019 20:58:04 +0800 Subject: [PATCH 1/2] feat(#896): store notification status in workflowrun --- pkg/apis/cyclone/v1alpha1/workflow_run.go | 21 ++++ .../cyclone/v1alpha1/zz_generated.deepcopy.go | 23 ++++ pkg/workflow/controller/config.go | 20 ++++ pkg/workflow/controller/config_test.go | 46 ++++++++ .../handlers/workflowrun/handler.go | 111 +++++++++++++++--- 5 files changed, 203 insertions(+), 18 deletions(-) create mode 100644 pkg/workflow/controller/config_test.go diff --git a/pkg/apis/cyclone/v1alpha1/workflow_run.go b/pkg/apis/cyclone/v1alpha1/workflow_run.go index 76e10062a..f1d501861 100644 --- a/pkg/apis/cyclone/v1alpha1/workflow_run.go +++ b/pkg/apis/cyclone/v1alpha1/workflow_run.go @@ -92,6 +92,8 @@ type WorkflowRunStatus struct { Overall Status `json:"overall"` // Whether gc is performed on this WorkflowRun, such as deleting pods. Cleaned bool `json:"cleaned"` + // Notifications represents the status of sending notifications. + Notifications map[string]NotificationStatus `json:"notifications,omitempty"` } // StageStatus describes status of a stage execution. @@ -154,6 +156,25 @@ type Status struct { StartTime metav1.Time `json:"startTime,omitempty"` } +// NotificationResult represents the result of sending notifications. +type NotificationResult string + +const ( + // NotificationResultSucceeded means success result of sending notifications. + NotificationResultSucceeded NotificationResult = "Succeeded" + + // NotificationResultFailed means failure result of sending notifications. + NotificationResultFailed NotificationResult = "Failed" +) + +// NotificationStatus represents the status of sending notifications. +type NotificationStatus struct { + // Result represents the result of sending notifications. + Result NotificationResult `json:"result"` + // Message represents the detailed message for result. + Message string `json:"message"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // WorkflowRunList describes an array of WorkflowRun instances. diff --git a/pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go index 97c3738a5..4696d591f 100644 --- a/pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go @@ -216,6 +216,22 @@ func (in *NotificationReceiver) DeepCopy() *NotificationReceiver { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NotificationStatus) DeepCopyInto(out *NotificationStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NotificationStatus. +func (in *NotificationStatus) DeepCopy() *NotificationStatus { + if in == nil { + return nil + } + out := new(NotificationStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Outputs) DeepCopyInto(out *Outputs) { *out = *in @@ -1019,6 +1035,13 @@ func (in *WorkflowRunStatus) DeepCopyInto(out *WorkflowRunStatus) { } } in.Overall.DeepCopyInto(&out.Overall) + if in.Notifications != nil { + in, out := &in.Notifications, &out.Notifications + *out = make(map[string]NotificationStatus, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/workflow/controller/config.go b/pkg/workflow/controller/config.go index ff895342d..00b05df2c 100644 --- a/pkg/workflow/controller/config.go +++ b/pkg/workflow/controller/config.go @@ -144,6 +144,10 @@ func validate(config *WorkflowControllerConfig) bool { log.Warn("Secret not configured, no auth information would be available, e.g. docker registry auth.") } + if !validateNotification(config.Notifications) { + return false + } + for _, k := range []string{GitResolverImage, ImageResolverImage, KvResolverImage, CoordinatorImage} { _, ok := config.Images[k] if !ok { @@ -154,6 +158,22 @@ func validate(config *WorkflowControllerConfig) bool { return true } +// validateNotification validates notification configurations. +// The names of notification endpoints must be unique. +func validateNotification(nes []NotificationEndpoint) bool { + names := map[string]struct{}{} + for _, ne := range nes { + if _, ok := names[ne.Name]; ok { + log.Errorf("There are multiple notification endpoints with same name: %s", ne.Name) + return false + } + + names[ne.Name] = struct{}{} + } + + return true +} + // ImagePullPolicy determines image pull policy based on environment variable DEVELOP_MODE // This pull policy will be used in image resolver containers and coordinator containers. func ImagePullPolicy() corev1.PullPolicy { diff --git a/pkg/workflow/controller/config_test.go b/pkg/workflow/controller/config_test.go new file mode 100644 index 000000000..907c9996f --- /dev/null +++ b/pkg/workflow/controller/config_test.go @@ -0,0 +1,46 @@ +package controller + +import ( + "testing" +) + +func TestValidateNotification(t *testing.T) { + testCases := map[string]struct { + endpoints []NotificationEndpoint + expected bool + }{ + "different endpoint names": { + endpoints: []NotificationEndpoint{ + { + Name: "n1", + URL: "http://n1.cyclone.dev", + }, + { + Name: "n2", + URL: "http://n2.cyclone.dev", + }, + }, + expected: true, + }, + "same endpoint names": { + endpoints: []NotificationEndpoint{ + { + Name: "n1", + URL: "http://n1.cyclone.dev", + }, + { + Name: "n1", + URL: "http://n2.cyclone.dev", + }, + }, + expected: false, + }, + } + + for d, tc := range testCases { + result := validateNotification(tc.endpoints) + if result != tc.expected { + t.Errorf("Test case %s failed: expected %t, but got %t", d, tc.expected, result) + } + } +} diff --git a/pkg/workflow/controller/handlers/workflowrun/handler.go b/pkg/workflow/controller/handlers/workflowrun/handler.go index 65e6b30bc..42bdbe9e6 100644 --- a/pkg/workflow/controller/handlers/workflowrun/handler.go +++ b/pkg/workflow/controller/handlers/workflowrun/handler.go @@ -3,11 +3,15 @@ package workflowrun import ( "bytes" "encoding/json" + "fmt" + "io/ioutil" "net/http" + "reflect" "time" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1" "github.com/caicloud/cyclone/pkg/k8s/clientset" @@ -107,8 +111,38 @@ func (h *Handler) ObjectUpdated(obj interface{}) { // If the WorkflowRun has already been terminated(Completed, Error, Cancel), send notifications if necessary, // otherwise directly skip it. if workflowrun.IsWorkflowRunTerminated(originWfr) { - // Send notification as workflowrun has been terminated. - sendNotifications(originWfr) + // Send notification after workflowrun terminated. + status, err := h.sendNotifications(originWfr) + if err != nil { + log.WithField("name", originWfr.Name).Error("Send notification error: ", err) + return + } + // If notification status is nil, then no notification is sent. + if status == nil { + return + } + + // Update WorkflowRun notification status with retry. + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Get latest WorkflowRun. + latest, err := h.Client.CycloneV1alpha1().WorkflowRuns(originWfr.Namespace).Get(originWfr.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + if !reflect.DeepEqual(latest.Status.Notifications, status) { + latest.Status.Notifications = status + _, err = h.Client.CycloneV1alpha1().WorkflowRuns(originWfr.Namespace).Update(latest) + return err + } + + return nil + }) + + if err != nil { + log.WithField("name", originWfr.Name).Error("Update workflowrun notification status error: ", err) + } + return } @@ -149,37 +183,46 @@ func (h *Handler) ObjectDeleted(obj interface{}) { return } -// validate workflow run -func validate(wfr *v1alpha1.WorkflowRun) bool { - // check workflowRef can not be nil - if wfr.Spec.WorkflowRef == nil { - log.WithField("name", wfr.Name).Error("WorkflowRef is nil") - return false +// sendNotifications send notifications for workflowruns when: +// * its workflow has notification config +// * finish time after workflow controller starts +// * notification status of workflowrun is not nil +// If the returned notification status is nil, it means that there is no need to send notification. +func (h *Handler) sendNotifications(wfr *v1alpha1.WorkflowRun) (map[string]v1alpha1.NotificationStatus, error) { + // No need to send notifications for workflowruns finished before workflow controller starts. + if wfr.Spec.WorkflowRef == nil || wfr.Status.Notifications != nil || + wfr.Status.Overall.LastTransitionTime.Before(controllerStartTime) { + return nil, nil } - return true -} + wfRef := wfr.Spec.WorkflowRef + wf, err := h.Client.CycloneV1alpha1().Workflows(wfRef.Namespace).Get(wfRef.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } -// sendNotifications send notifications for workflowruns. -// Will skip workflowruns have finished before workflow controller starts. -func sendNotifications(wfr *v1alpha1.WorkflowRun) { - // No need to send notifications for workflowruns finished before workflow controller starts. - if wfr.Status.Overall.LastTransitionTime.Before(controllerStartTime) { - return + if len(wf.Spec.Notification.Receivers) == 0 { + return nil, nil } // Send notifications with workflowrun. bodyBytes, err := json.Marshal(wfr) if err != nil { log.WithField("wfr", wfr.Name).Error("Failed to marshal workflowrun: ", err) - return + return nil, err } body := bytes.NewReader(bodyBytes) + status := make(map[string]v1alpha1.NotificationStatus) for _, endpoint := range controller.Config.Notifications { req, err := http.NewRequest(http.MethodPost, endpoint.URL, body) if err != nil { - log.WithField("wfr", wfr.Name).Error("Failed to new notification request: ", err) + err = fmt.Errorf("Failed to new notification request: %v", err) + log.WithField("wfr", wfr.Name).Error(err) + status[endpoint.Name] = v1alpha1.NotificationStatus{ + Result: v1alpha1.NotificationResultFailed, + Message: err.Error(), + } continue } // Set Json content type in Http header. @@ -187,9 +230,41 @@ func sendNotifications(wfr *v1alpha1.WorkflowRun) { resp, err := http.DefaultClient.Do(req) if err != nil { + s := v1alpha1.NotificationStatus{ + Result: v1alpha1.NotificationResultFailed, + } + log.WithField("wfr", wfr.Name).Errorf("Failed to send notification for %s: %v", endpoint.Name, err) + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Error(err) + s.Message = err.Error() + } else { + s.Message = fmt.Sprintf("Status code: %d, error: %s", resp.StatusCode, body) + } + + status[endpoint.Name] = s continue } + log.WithField("wfr", wfr.Name).Infof("Status code of notification for %s: %d", endpoint.Name, resp.StatusCode) + status[endpoint.Name] = v1alpha1.NotificationStatus{ + Result: v1alpha1.NotificationResultSucceeded, + Message: fmt.Sprintf("Status code: %d", resp.StatusCode), + } } + + return status, nil +} + +// validate workflow run +func validate(wfr *v1alpha1.WorkflowRun) bool { + // check workflowRef can not be nil + if wfr.Spec.WorkflowRef == nil { + log.WithField("name", wfr.Name).Error("WorkflowRef is nil") + return false + } + + return true } From bcd9ea7ff6b594e3c1b58c1145f217207016678c Mon Sep 17 00:00:00 2001 From: Robin Yue Date: Mon, 8 Apr 2019 10:25:09 +0800 Subject: [PATCH 2/2] fix comments --- .../controller/handlers/workflowrun/handler.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/workflow/controller/handlers/workflowrun/handler.go b/pkg/workflow/controller/handlers/workflowrun/handler.go index 42bdbe9e6..e61e870a1 100644 --- a/pkg/workflow/controller/handlers/workflowrun/handler.go +++ b/pkg/workflow/controller/handlers/workflowrun/handler.go @@ -6,7 +6,6 @@ import ( "fmt" "io/ioutil" "net/http" - "reflect" "time" log "github.com/sirupsen/logrus" @@ -130,7 +129,7 @@ func (h *Handler) ObjectUpdated(obj interface{}) { return err } - if !reflect.DeepEqual(latest.Status.Notifications, status) { + if latest.Status.Notifications == nil { latest.Status.Notifications = status _, err = h.Client.CycloneV1alpha1().WorkflowRuns(originWfr.Namespace).Update(latest) return err @@ -186,16 +185,18 @@ func (h *Handler) ObjectDeleted(obj interface{}) { // sendNotifications send notifications for workflowruns when: // * its workflow has notification config // * finish time after workflow controller starts -// * notification status of workflowrun is not nil +// * notification status of workflowrun is nil // If the returned notification status is nil, it means that there is no need to send notification. func (h *Handler) sendNotifications(wfr *v1alpha1.WorkflowRun) (map[string]v1alpha1.NotificationStatus, error) { - // No need to send notifications for workflowruns finished before workflow controller starts. - if wfr.Spec.WorkflowRef == nil || wfr.Status.Notifications != nil || + if wfr.Status.Notifications != nil || wfr.Status.Overall.LastTransitionTime.Before(controllerStartTime) { return nil, nil } wfRef := wfr.Spec.WorkflowRef + if wfRef == nil { + return nil, fmt.Errorf("Workflow reference of workflow run %s/%s is empty", wfr.Namespace, wfr.Name) + } wf, err := h.Client.CycloneV1alpha1().Workflows(wfRef.Namespace).Get(wfRef.Name, metav1.GetOptions{}) if err != nil { return nil, err