8000 fix(api): check worker model requirements on job processing (#4062) · ovh/cds@dce008c · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit dce008c

Browse files
fsaminbnjjj
authored andcommitted
fix(api): check worker model requirements on job processing (#4062)
while computing the new job `addJobsToQueue`, the function `processNodeJobRunRequirements` now checks: * the group of the worker model againt the `execGroups` computed on the node (close #4002) * the binaries requirements against the worker model binary capabilitites In case of error, the job is failed
1 parent 901e836 commit dce008c

File tree

6 files changed

+543
-506
lines changed

6 files changed

+543
-506
lines changed

engine/api/workflow/execute_node_run.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,8 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
375375
spawnErrs.Join(*err)
376376
}
377377

378-
_, next = observability.Span(ctx, "workflow.getNodeJobRunRequirements")
379-
jobRequirements, containsService, modelType, err := getNodeJobRunRequirements(db, *job, run)
378+
_, next = observability.Span(ctx, "workflow.processNodeJobRunRequirements")
379+
jobRequirements, containsService, modelType, err := processNodeJobRunRequirements(db, *job, run, sdk.GroupsToIDs(groups))
380380
next()
381381
if err != nil {
382382
spawnErrs.Join(*err)
@@ -419,22 +419,22 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
419419
skippedOrDisabledJobs++
420420
}
421421

422+
// If there is any error in the previous operation, mark the job as failed
422423
if !spawnErrs.IsEmpty() {
423424
failedJobs++
424425
wjob.Status = sdk.StatusFail.String()
425-
spawnInfos := sdk.SpawnMsg{
426-
ID: sdk.MsgSpawnInfoJobError.ID,
427-
}
428426

429427
for _, e := range spawnErrs {
430-
spawnInfos.Args = append(spawnInfos.Args, sdk.Cause(e).Error())
428+
msg := sdk.SpawnMsg{
429+
ID: sdk.MsgSpawnInfoJobError.ID,
430+
}
431+
msg.Args = []interface{}{sdk.Cause(e).Error()}
432+
wjob.SpawnInfos = append(wjob.SpawnInfos, sdk.SpawnInfo{
433+
APITime: time.Now(),
434+
Message: msg,
435+
RemoteTime: time.Now(),
436+
})
431437
}
432-
433-
wjob.SpawnInfos = []sdk.SpawnInfo{sdk.SpawnInfo{
434-
APITime: time.Now(),
435-
Message: spawnInfos,
436-
RemoteTime: time.Now(),
437-
}}
438438
} else {
439439
wjob.SpawnInfos = []sdk.SpawnInfo{sdk.SpawnInfo{
440440
APITime: time.Now(),

engine/api/workflow/process_requirements.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,14 @@ import (
1212
"github.com/ovh/cds/sdk/log"
1313
)
1414

15-
// getNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
15+
// processNodeJobRunRequirements returns requirements list interpolated, and true or false if at least
1616
// one requirement is of type "Service"
17-
func getNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun) (sdk.RequirementList, bool, string, *sdk.MultiError) {
18-
requirements := sdk.RequirementList{}
19-
tmp := map[string]string{}
20-
errm := &sdk.MultiError{}
21-
17+
func processNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.WorkflowNodeRun, execsGroupIDs []int64) (sdk.RequirementList, bool, string, *sdk.MultiError) {
18+
var requirements sdk.RequirementList
19+
var errm sdk.MultiError
2220
var containsService bool
2321
var model string
24-
for _, v := range run.BuildParameters {
25-
tmp[v.Name] = v.Value
26-
}
22+
var tmp = sdk.ParametersToMap(run.BuildParameters)
2723

2824
for _, v := range j.Action.Requirements {
2925
name, errName := interpolate.Do(v.Name, tmp)
@@ -36,29 +32,64 @@ func getNodeJobRunRequirements(db gorp.SqlExecutor, j sdk.Job, run *sdk.Workflow
3632
errm.Append(errValue)
3733
continue
3834
}
39-
sdk.AddRequirement(&requirements, v.ID, name, v.Type, value)
35+
4036
if v.Type == sdk.ServiceRequirement {
4137
containsService = true
4238
}
4339
if v.Type == sdk.ModelRequirement {
40+
// It is forbidden to have more than one model requirement.
41+
if model != "" {
42+
errm.Append(sdk.ErrInvalidJobRequirementDuplicateModel)
43+
break
44+
}
4445
model = value
4546
}
47+
48+
sdk.AddRequirement(&requirements, v.ID, name, v.Type, value)
4649
}
4750

4851
var modelType string
4952
if model != "" {
6D40
53+
// Load the worker model
5054
wm, err := worker.LoadWorkerModelByName(db, model)
5155
if err != nil {
5256
log.Error("getNodeJobRunRequirements> error while getting worker model %s: %v", model, err)
57+
errm.Append(sdk.ErrNoWorkerModel)
5358
} else {
59+
// Check that the worker model is in an exec group
60+
if !sdk.IsInInt64Array(wm.GroupID, execsGroupIDs) {
61+
errm.Append(sdk.ErrInvalidJobRequirementWorkerModelPermission)
62+
}
63+
64+
// Check that the worker model has the binaries capabilitites
65+
// only if the worker model doesn't need registration
66+
if !wm.NeedRegistration && !wm.CheckRegistration {
67+
for _, req := range requirements {
68+
if req.Type == sdk.BinaryRequirement {
69+
var hasCapa bool
70+
for _, cap := range wm.RegisteredCapabilities {
71+
if cap.Value == req.Value {
72+
hasCapa = true
73+
break
74+
}
75+
}
76+
if !hasCapa {
77+
errm.Append(sdk.ErrInvalidJobRequirementWorkerModelCapabilitites)
78+
break
79+
}
80+
}
81+
}
82+
}
83+
5484
modelType = wm.Type
5585
}
86+
5687
}
5788

5889
if errm.IsEmpty() {
5990
return requirements, containsService, modelType, nil
6091
}
61-
return requirements, containsService, modelType, errm
92+
return requirements, containsService, modelType, &errm
6293
}
6394

6495
func prepareRequirementsToNodeJobRunParameters(reqs sdk.RequirementList) []sdk.Parameter {

0 commit comments

Comments
 (0)
0