8000 feat(#896): store notification status in workflowrun by supereagle · Pull Request #897 · caicloud/cyclone · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(#896): store notification status in workflowrun #897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/apis/cyclone/v1alpha1/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type WorkflowRunStatus struct {
Overall Status `json:"overall"`
// Whether gc is performed on this WorkflowRun, such as deleting pods.
Cleaned bool `json:"cleaned"`
// Notifications represents the status of sending notifications.
Notifications map[string]NotificationStatus `json:"notifications,omitempty"`
}

// StageStatus describes status of a stage execution.
Expand Down Expand Up @@ -154,6 +156,25 @@ type Status struct {
StartTime metav1.Time `json:"startTime,omitempty"`
}

// NotificationResult represents the result of sending notifications.
type Notifi 8000 cationResult string

const (
// NotificationResultSucceeded means success result of sending notifications.
NotificationResultSucceeded NotificationResult = "Succeeded"

// NotificationResultFailed means failure result of sending notifications.
NotificationResultFailed NotificationResult = "Failed"
)

// NotificationStatus represents the status of sending notifications.
type NotificationStatus struct {
// Result represents the result of sending notifications.
Result NotificationResult `json:"result"`
// Message represents the detailed message for result.
Message string `json:"message"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// WorkflowRunList describes an array of WorkflowRun instances.
Expand Down
23 changes: 23 additions & 0 deletions pkg/apis/cyclone/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func validate(config *WorkflowControllerConfig) bool {
log.Warn("Secret not configured, no auth information would be available, e.g. docker registry auth.")
}

if !validateNotification(config.Notifications) {
return false
}

for _, k := range []string{GitResolverImage, ImageResolverImage, KvResolverImage, CoordinatorImage} {
_, ok := config.Images[k]
if !ok {
Expand All @@ -154,6 +158,22 @@ func validate(config *WorkflowControllerConfig) bool {
return true
}

// validateNotification validates notification configurations.
// The names of notification endpoints must be unique.
func validateNotification(nes []NotificationEndpoint) bool {
names := map[string]struct{}{}
for _, ne := range nes {
if _, ok := names[ne.Name]; ok {
log.Errorf("There are multiple notification endpoints with same name: %s", ne.Name)
return false
}

names[ne.Name] = struct{}{}
}

return true
}

// ImagePullPolicy determines image pull policy based on environment variable DEVELOP_MODE
// This pull policy will be used in image resolver containers and coordinator containers.
func ImagePullPolicy() corev1.PullPolicy {
Expand Down
46 changes: 46 additions & 0 deletions pkg/workflow/controller/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package controller

import (
"testing"
)

func TestValidateNotification(t *testing.T) {
testCases := map[string]struct {
endpoints []NotificationEndpoint
expected bool
}{
"different endpoint names": {
endpoints: []NotificationEndpoint{
{
Name: "n1",
URL: "http://n1.cyclone.dev",
},
{
Name: "n2",
URL: "http://n2.cyclone.dev",
},
},
expected: true,
},
"same endpoint names": {
endpoints: []NotificationEndpoint{
{
Name: "n1",
URL: "http://n1.cyclone.dev",
},
{
Name: "n1",
URL: "http://n2.cyclone.dev",
},
},
expected: false,
},
}

for d, tc := range testCases {
result := validateNotification(tc.endpoints)
if result != tc.expected {
t.Errorf("Test case %s failed: expected %t, but got %t", d, tc.expected, result)
}
}
}
112 changes: 94 additions & 18 deletions pkg/workflow/controller/handlers/workflowrun/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package workflowrun
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
Expand Down Expand Up @@ -107,8 +110,38 @@ func (h *Handler) ObjectUpdated(obj interface{}) {
// If the WorkflowRun has already been terminated(Completed, Error, Cancel), send notifications if necessary,
// otherwise directly skip it.
if workflowrun.IsWorkflowRunTerminated(originWfr) {
// Send notification as workflowrun has been terminated.
sendNotifications(originWfr)
// Send notification after workflowrun terminated.
status, err := h.sendNotifications(originWfr)
if err != nil {
log.WithField("name", originWfr.Name).Error("Send notification error: ", err)
return
}
// If notification status is nil, then no notification is sent.
if status == nil {
return
}

// Update WorkflowRun notification status with retry.
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Get latest WorkflowRun.
latest, err := h.Client.CycloneV1alpha1().WorkflowRuns(originWfr.Namespace).Get(originWfr.Name, metav1.GetOptions{})
if err != nil {
return err
}

if latest.Status.Notifications == nil {
latest.Status.Notifications = status
_, err = h.Client.CycloneV1alpha1().WorkflowRuns(originWfr.Namespace).Update(latest)
return err
}

return nil
})

if err != nil {
log.WithField("name", originWfr.Name).Error("Update workflowrun notification status error: ", err)
}

return
}

Expand Down Expand Up @@ -149,47 +182,90 @@ func (h *Handler) ObjectDeleted(obj interface{}) {
return
}

// validate workflow run
func validate(wfr *v1alpha1.WorkflowRun) bool {
// check workflowRef can not be nil
if wfr.Spec.WorkflowRef == nil {
log.WithField("name", wfr.Name).Error("WorkflowRef is nil")
return false
// sendNotifications send notifications for workflowruns when:
// * its workflow has notification config
// * finish time after workflow controller starts
// * notification status of workflowrun is nil
// If the returned notification status is nil, it means that there is no need to send notification.
func (h *Handler) sendNotifications(wfr *v1alpha1.WorkflowRun) (map[string]v1alpha1.NotificationStatus, error) {
if wfr.Status.Notifications != nil ||
wfr.Status.Overall.LastTransitionTime.Before(controllerStartTime) {
return nil, nil
}

return true
}
wfRef := wfr.Spec.WorkflowRef
if wfRef == nil {
return nil, fmt.Errorf("Workflow reference of workflow run %s/%s is empty", wfr.Namespace, wfr.Name)
}
wf, err := h.Client.CycloneV1alpha1().Workflows(wfRef.Namespace).Get(wfRef.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

// sendNotifications send notifications for workflowruns.
// Will skip workflowruns have finished before workflow controller starts.
func sendNotifications(wfr *v1alpha1.WorkflowRun) {
// No need to send notifications for workflowruns finished before workflow controller starts.
if wfr.Status.Overall.LastTransitionTime.Before(controllerStartTime) {
return
if len(wf.Spec.Notification.Receivers) == 0 {
return nil, nil
}

// Send notifications with workflowrun.
bodyBytes, err := json.Marshal(wfr)
if err != nil {
log.WithField("wfr", wfr.Name).Error("Failed to marshal workflowrun: ", err)
return
return nil, err
}
body := bytes.NewReader(bodyBytes)

status := make(map[string]v1alpha1.NotificationStatus)
for _, endpoint := range controller.Config.Notifications {
req, err := http.NewRequest(http.MethodPost, endpoint.URL, body)
if err != nil {
log.WithField("wfr", wfr.Name).Error("Failed to new notification request: ", err)
err = fmt.Errorf("Failed to new notification request: %v", err)
log.WithField("wfr", wfr.Name).Error(err)
status[endpoint.Name] = v1alpha1.NotificationStatus{
Result: v1alpha1.NotificationResultFailed,
Message: err.Error(),
}
continue
}
// Set Json content type in Http header.
req.Header.Set(contentType, contentJSON)

resp, err := http.DefaultClient.Do(req)
if err != nil {
s := v1alpha1.NotificationStatus{
Result: v1alpha1.NotificationResultFailed,
}

log.WithField("wfr", wfr.Name).Errorf("Failed to send notification for %s: %v", endpoint.Name, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error(err)
s.Message = err.Error()
} else {
s.Message = fmt.Sprintf("Status code: %d, error: %s", resp.StatusCode, body)
}

status[endpoint.Name] = s
continue
}

log.WithField("wfr", wfr.Name).Infof("Status code of notification for %s: %d", endpoint.Name, resp.StatusCode)
status[endpoint.Name] = v1alpha1.NotificationStatus{
Result: v1alpha1.NotificationResultSucceeded,
Message: fmt.Sprintf("Status code: %d", resp.StatusCode),
}
}

return status, nil
}

// validate workflow run
func validate(wfr *v1alpha1.WorkflowRun) bool {
// check workflowRef can not be nil
if wfr.Spec.WorkflowRef == nil {
log.WithField("name", wfr.Name).Error("WorkflowRef is nil")
return false
}

return true
}
0