8000 fix(api): run wf with mutex, from a hook (#4718) · ovh/cds@3e9cdad · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 3e9cdad

Browse files
yesnaultbnjjj
authored andcommitted
fix(api): run wf with mutex, from a hook (#4718)
Signed-off-by: Yvonnick Esnault <yvonnick.esnault@corp.ovh.com>
1 parent f65f163 commit 3e9cdad

File tree

7 files changed

+201
-24
lines changed

7 files changed

+201
-24
lines changed

engine/api/workflow/dao.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,7 @@ func Push(ctx context.Context, db *gorp.DbMap, store cache.Store, proj *sdk.Proj
13821382
defer end()
13831383
allMsg := []sdk.Message{}
13841384

1385-
data, err := ExtractFromCDSFiles(tr)
1385+
data, err := extractFromCDSFiles(tr)
13861386
if err != nil {
13871387
return nil, nil, err
13881388
}

engine/api/workflow/process_node.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,16 +342,24 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
342342

343343
//Check the context.mutex to know if we are allowed to run it
344344
if n.Context.Mutex {
345-
//Check if there are builing workflownoderun with the same workflow_node_name for the same workflow
345+
//Check if there are previous waiting or builing workflownoderun
346+
// with the same workflow_node_name for the same workflow
347+
348+
// in this sql, we use 'and workflow_node_run.id < $2' and not and workflow_node_run.id <> $2
349+
// we check if there is a previous build in waiting status
350+
// and or if there is another build (never or not) with building status
346351
mutexQuery := `select count(1)
347352
from workflow_node_run
348353
join workflow_run on workflow_run.id = workflow_node_run.workflow_run_id
349354
join workflow on workflow.id = workflow_run.workflow_id
350355
where workflow.id = $1
351-
and workflow_node_run.id <> $2
352356
and workflow_node_run.workflow_node_name = $3
353-
and workflow_node_run.status = $4`
354-
nbMutex, err := db.SelectInt(mutexQuery, n.WorkflowID, nr.ID, n.Name, string(sdk.StatusBuilding))
357+
and (
358+
(workflow_node_run.id < $2 and workflow_node_run.status = $4)
359+
or
360+
(workflow_node_run.id <> $2 and workflow_node_run.status = $5)
361+
)`
362+
nbMutex, err := db.SelectInt(mutexQuery, n.WorkflowID, nr.ID, n.Name, string(sdk.StatusWaiting), string(sdk.StatusBuilding))
355363
if err != nil {
356364
return nil, false, sdk.WrapError(err, "unable to check mutexes")
357365
}
@@ -361,7 +369,6 @@ func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
361369
ID: sdk.MsgWorkflowNodeMutex.ID,
362370
Args: []interface{}{n.Name},
363371
})
364-
365372
if err := UpdateWorkflowRun(ctx, db, wr); err != nil {
366373
return nil, false, sdk.WrapError(err, "unable to update workflow run")
367374
}

engine/api/workflow/process_start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func processStartFromNode(ctx context.Context, db gorp.SqlExecutor, store cache.
5050
}
5151

5252
func processStartFromRootNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
53-
log.Debug("processWorkflowRun> starting from the root : %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.ID].Name)
53+
log.Debug("processWorkflowRun> starting from the root: %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.PipelineID].Name)
5454
report := new(ProcessorReport)
5555
//Run the root: manual or from an event
5656
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{

engine/api/workflow/repository.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,15 @@ func ReadCDSFiles(files map[string][]byte) (*tar.Reader, error) {
147147
return tar.NewReader(buf), nil
148148
}
149149

150-
type ExportedEntities struct {
150+
type exportedEntities struct {
151151
wrkflw exportentities.Workflow
152152
apps map[string]exportentities.Application
153153
pips map[string]exportentities.PipelineV1
154154
envs map[string]exportentities.Environment
155155
}
156156

157-
func ExtractFromCDSFiles(tr *tar.Reader) (*ExportedEntities, error) {
158-
var res = ExportedEntities{
157+
func extractFromCDSFiles(tr *tar.Reader) (*exportedEntities, error) {
158+
var res = exportedEntities{
159159
apps: make(map[string]exportentities.Application),
160160
pips: make(map[string]exportentities.PipelineV1),
161161
envs: make(map[string]exportentities.Environment),

engine/api/workflow/run_workflow.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ func runFromHook(ctx context.Context, db gorp.SqlExecutor, store cache.Store, p
6767
if errWR != nil {
6868
return nil, sdk.WrapError(errWR, "RunFromHook> Unable to process workflow run")
6969
}
70-
if !hasRun {
70+
// if there is no report and the wf was not run -> condition not ok, set NeverBuilt
71+
// if it was not run due to a mutex -> the report is not nil in this case
72+
// so, we let the wf with status building by returning the report with no error
73+
if r1 == nil && !hasRun {
7174
wr.Status = sdk.StatusNeverBuilt.String()
7275
wr.LastExecution = time.Now()
7376
report.Add(wr)

engine/api/workflow_run_test.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/http/httptest"
1111
"testing"
12+
"time"
1213

1314
"github.com/go-gorp/gorp"
1415
"github.com/stretchr/testify/assert"
@@ -1518,6 +1519,169 @@ func Test_postWorkflowRunHandlerWithoutRightConditionsOnHook(t *testing.T) {
15181519
assert.Equal(t, 400, rec.Code)
15191520
}
15201521

1522+
func Test_postWorkflowRunHandlerWithMutex(t *testing.T) {
1523+
api, db, router, end := newTestAPI(t, bootstrap.InitiliazeDB)
1524+
defer end()
1525+
u, pass := assets.InsertAdminUser(api.mustDB())
1526+
key := sdk.RandomString(10)
1527+
proj := assets.InsertTestProject(t, db, api.Cache, key, key, u)
1528+
1529+
//First pipeline
1530+
pip := sdk.Pipeline{
1531+
ProjectID: proj.ID,
1532+
ProjectKey: proj.Key,
1533+
Name: "pip1",
1534+
}
1535+
test.NoError(t, pipeline.InsertPipeline(api.mustDB(), api.Cache, proj, &pip, u))
1536+
1537+
s := sdk.NewStage("stage 1")
1538+
s.Enabled = true
1539+
s.PipelineID = pip.ID
1540+
test.NoError(t, pipeline.InsertStage(api.mustDB(), s))
1541+
j := &sdk.Job{
1542+
Enabled: true,
1543+
Action: sdk.Action{
1544+
Enabled: true,
1545+
},
1546+
}
1547+
test.NoError(t, pipeline.InsertJob(api.mustDB(), j, s.ID, &pip))
1548+
s.Jobs = append(s.Jobs, *j)
1549+
1550+
pip.Stages = append(pip.Stages, *s)
1551+
1552+
//Second pipeline
1553+
pip2 := sdk.Pipeline{
1554+
ProjectID: proj.ID,
1555+
ProjectKey: proj.Key,
1556+
Name: "pip2",
1557+
}
1558+
test.NoError(t, pipeline.InsertPipeline(api.mustDB(), api.Cache, proj, &pip2, u))
1559+
s = sdk.NewStage("stage 1")
1560+
s.Enabled = true
1561+
s.PipelineID = pip2.ID
1562+
test.NoError(t, pipeline.InsertStage(api.mustDB(), s))
1563+
j = &sdk.Job{
1564+
Enabled: true,
1565+
Action: sdk.Action{
1566+
Enabled: true,
1567+
},
1568+
}
1569+
test.NoError(t, pipeline.InsertJob(api.mustDB(), j, s.ID, &pip2))
1570+
s.Jobs = append(s.Jobs, *j)
1571+
1572+
mockHookService := &sdk.Service{Name: "Test_postWorkflowRunHandlerWithMutex", Type: services.TypeHooks}
1573+
test.NoError(t, services.Insert(api.mustDB(), mockHookService))
1574+
1575+
//This is a mock for the repositories service
1576+
services.HTTPClient = mock(
1577+
func(r *http.Request) (*http.Response, error) {
1578+
body := new(bytes.Buffer)
1579+
w := new(http.Response)
1580+
enc := json.NewEncoder(body)
1581+
w.Body = ioutil.NopCloser(body)
1582+
1583+
switch r.URL.String() {
1584+
case "/task/bulk":
1585+
hooks := map[string]sdk.NodeHook{}
1586+
hooks["1cbf3792-126b-4111-884f-077bdee9523d"] = sdk.NodeHook{
1587+
HookModelName: sdk.WebHookModel.Name,
1588+
Config: sdk.WebHookModel.DefaultConfig,
1589+
Ref: "root.0",
1590+
UUID: "1cbf3792-126b-4111-884f-077bdee9523d",
1591+
}
1592+
if err := enc.Encode(hooks); err != nil {
1593+
return writeError(w, err)
1594+
}
1595+
default:
1596+
return writeError(w, fmt.Errorf("route %s must not be called", r.URL.String()))
1597+
}
1598+
return w, nil
1599+
},
1600+
)
1601+
1602+
_, errDb := db.Exec("DELETE FROM w_node_hook WHERE uuid = $1", "1cbf3792-126b-4111-884f-077bdee9523d")
1603+
test.NoError(t, errDb)
1604+
1605+
w := sdk.Workflow{
1606+
Name: "test_1",
1607+
ProjectID: proj.ID,
1608+
ProjectKey: proj.Key,
1609+
HookModels: map[int64]sdk.WorkflowHookModel{
1610+
1: sdk.WebHookModel,
1611+
},
1612+
WorkflowData: &sdk.WorkflowData{
1613+
Node: sdk.Node{
1614+
Name: "root",
1615+
Type: sdk.NodeTypePipeline,
1616+
Context: &sdk.NodeContext{
1617+
PipelineID: pip.ID,
1618+
Mutex: true,
1619+
},
1620+
Hooks: []sdk.NodeHook{
1621+
sdk.NodeHook{
1622+
HookModelName: sdk.WebHookModel.Name,
1623+
Config: sdk.WebHookModel.DefaultConfig,
1624+
Ref: "root.0",
1625+
UUID: "1cbf3792-126b-4111-884f-077bdee9523d",
1626+
},
1627+
},
1628+
},
1629+
},
1630+
}
1631+
1632+
proj2, errP := project.Load(api.mustDB(), api.Cache, proj.Key, u, project.LoadOptions.WithPipelines, project.LoadOptions.WithGroups, project.LoadOptions.WithEnvironments)
1633+
test.NoError(t, errP)
1634+
1635+
test.NoError(t, workflow.Insert(api.mustDB(), api.Cache, &w, proj2, u))
1636+
w1, err := workflow.Load(context.TODO(), api.mustDB(), api.Cache, proj2, "test_1", u, workflow.LoadOptions{})
1637+
test.NoError(t, err)
1638+
1639+
//Prepare request
1640+
vars := map[string]string{
1641+
"key": proj.Key,
1642+
"permWorkflowName": w1.Name,
1643+
}
1644+
uri := router.GetRoute("POST", api.postWorkflowRunHandler, vars)
1645+
test.NotEmpty(t, uri)
1646+
1647+
opts := &sdk.WorkflowRunPostHandlerOption{
1648+
Hook: &sdk.WorkflowNodeRunHookEvent{
1649+
Payload: nil,
1650+
WorkflowNodeHookUUID: "1cbf3792-126b-4111-884f-077bdee9523d",
1651+
},
1652+
}
1653+
req := assets.NewAuthentifiedRequest(t, u, pass, "POST", uri, opts)
1654+
1655+
//Do the request, start first workflow
1656+
rec := httptest.NewRecorder()
1657+
router.Mux.ServeHTTP(rec, req)
1658+
var body []byte
1659+
_, err = req.Body.Read(body)
1660+
test.NoError(t, err)
1661+
defer req.Body.Close()
1662+
assert.Equal(t, 202, rec.Code)
1663+
1664+
req2 := assets.NewAuthentifiedRequest(t, u, pass, "POST", uri, opts)
1665+
1666+
//Do the request, start a new run
1667+
rec2 := httptest.NewRecorder()
1668+
router.Mux.ServeHTTP(rec2, req2)
1669+
var body2 []byte
1670+
_, err = req2.Body.Read(body2)
1671+
test.NoError(t, err)
1672+
defer req2.Body.Close()
1673+
assert.Equal(t, 202, rec2.Code)
1674+
1675+
// it's an async call, wait a bit the let cds take care of the previous request
1676+
time.Sleep(3 * time.Second)
1677+
1678+
lastRun, err := workflow.LoadLastRun(api.mustDB(), proj.Key, w1.Name, workflow.LoadRunOptions{})
1679+
test.NoError(t, err)
1680+
assert.Equal(t, int64(2), lastRun.Number)
1681+
assert.Equal(t, sdk.StatusBuilding.String(), lastRun.Status)
1682+
1683+
}
1684+
15211685
func Test_postWorkflowRunHandler_Forbidden(t *testing.T) {
15221686
api, db, router, end := newTestAPI(t, bootstrap.InitiliazeDB)
15231687
defer end()

engine/hooks/outgoing_hooks.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -196,22 +196,25 @@ func (s *Service) doOutgoingWorkflowExecution(t *sdk.TaskExecution) error {
196196
}
197197

198198
var payloadInt interface{}
199-
if err := json.Unmarshal([]byte(payloadstr), &payloadInt); err == nil {
200-
e := dump.NewDefaultEncoder()
201-
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
202-
e.ExtraFields.DetailedMap = false
203-
e.ExtraFields.DetailedStruct = false
204-
e.ExtraFields.Len = false
205-
e.ExtraFields.Type = false
206-
m1, errm1 := e.ToStringMap(payloadInt)
207-
if errm1 != nil {
208-
log.Error("Hooks> doOutgoingWorkflowExecution> Cannot convert payload to map %s", errm1)
199+
if payloadstr != "" {
200+
if err := json.Unmarshal([]byte(payloadstr), &payloadInt); err == nil {
201+
e := dump.NewDefaultEncoder()
202+
e.Formatters = []dump.KeyFormatterFunc{dump.WithDefaultLowerCaseFormatter()}
203+
e.ExtraFields.DetailedMap = false
204+
e.ExtraFields.DetailedStruct = false
205+
e.ExtraFields.Len = false
206+
e.ExtraFields.Type = false
207+
m1, errm1 := e.ToStringMap(payloadInt)
208+
if errm1 != nil {
209+
log.Error("Hooks> doOutgoingWorkflowExecution> Cannot convert payload to map %s", errm1)
210+
} else {
211+
payloadValues = m1
212+
}
209213
} else {
210-
payloadValues = m1
214+
log.Error("Hooks> doOutgoingWorkflowExecution> Cannot unmarshall payload %s", err)
211215
}
212-
} else {
213-
log.Error("Hooks> doOutgoingWorkflowExecution> Cannot unmarshall payload %s", err)
214216
}
217+
215218
payloadValues["payload"] = string(payloadstr)
216219
}
217220

0 commit comments

Comments
 (0)
0