8000 fix(api): No hatchery can spawn a worker msg (#4370) · ovh/cds@4b0ed37 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 4b0ed37

Browse files
authored
fix(api): No hatchery can spawn a worker msg (#4370)
* fix(api): No hatchery can spawn a worker msg close #3318 Signed-off-by: Yvonnick Esnault <yvonnick.esnault@corp.ovh.com>
1 parent ef0bbcb commit 4b0ed37

File tree

14 files changed

+164
-494
lines changed

14 files changed

+164
-494
lines changed

engine/api/api_routes.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ func (api *API) InitRouter() {
9191
r.Handle("/group/{permGroupName}/token", r.GET(api.getGroupTokenListHandler), r.POST(api.generateTokenHandler))
9292
r.Handle("/group/{permGroupName}/token/{tokenid}", r.DELETE(api.deleteTokenHandler))
9393

94-
// Hatchery
95-
r.Handle("/hatchery/count/{workflowNodeRunID}", r.GET(api.hatcheryCountHandler))
96-
9794
// Hooks
9895
r.Handle("/hook/{uuid}/workflow/{workflowID}/vcsevent/{vcsServer}", r.GET(api.getHookPollingVCSEvents))
9996

@@ -286,7 +283,6 @@ func (api *API) InitRouter() {
286283
r.Handle("/queue/workflows/count", r.GET(api.countWorkflowJobQueueHandler, EnableTracing(), MaintenanceAware()))
287284
r.Handle("/queue/workflows/{id}/take", r.POST(api.postTakeWorkflowJobHandler, NeedWorker(), EnableTracing(), MaintenanceAware()))
288285
r.Handle("/queue/workflows/{id}/book", r.POST(api.postBookWorkflowJobHandler, NeedHatchery(), EnableTracing(), MaintenanceAware()), r.DELETE(api.deleteBookWorkflowJobHandler, NeedHatchery(), EnableTracing(), MaintenanceAware()))
289-
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing(), MaintenanceAware()))
290286
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), NeedHatchery(), EnableTracing(), MaintenanceAware()))
291287
r.Handle("/queue/workflows/{permID}/vulnerability", r.POSTEXECUTE(api.postVulnerabilityReportHandler, NeedWorker(), EnableTracing(), MaintenanceAware()))
292288
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery(), EnableTracing(), MaintenanceAware()))

engine/api/hatchery.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

engine/api/services/hatchery.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

engine/api/workflow/execute_node_job_run.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line number< 67ED /th>Diff line change
@@ -3,7 +3,6 @@ package workflow
33
import (
44
"bytes"
55
"context"
6-
"database/sql"
76
"encoding/base64"
87
"fmt"
98
"sync"
@@ -570,27 +569,6 @@ func FreeNodeJobRun(store cache.Store, id int64) error {
570569
return sdk.WrapError(sdk.ErrJobNotBooked, "BookNodeJobRun> job %d already released", id)
571570
}
572571

573-
//AddNodeJobAttempt add an hatchery attempt to spawn a job
574-
func AddNodeJobAttempt(db gorp.SqlExecutor, id, hatcheryID int64) ([]int64, error) {
575-
var ids []int64
576-
query := "UPDATE workflow_node_run_job SET spawn_attempts = array_append(spawn_attempts, $1) WHERE id = $2"
577-
if _, err := db.Exec(query, hatcheryID, id); err != nil && err != sql.ErrNoRows {
578-
return ids, sdk.WrapError(err, "cannot update node run job")
579-
}
580-
581-
rows, err := db.Query("SELECT DISTINCT unnest(spawn_attempts) FROM workflow_node_run_job WHERE id = $1", id)
582-
var hID int64
583-
defer rows.Close()
584-
for rows.Next() {
585-
if errS := rows.Scan(&hID); errS != nil {
586-
return ids, sdk.WrapError(errS, "AddNodeJobAttempt> cannot scan")
587-
}
588-
ids = append(ids, hID)
589-
}
590-
591-
return ids, err
592-
}
593-
594572
//AddLog adds a build log
595573
func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, maxLogSize int64) error {
596574
if job != nil {

engine/api/workflow/gorp_model.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"database/sql"
55
"time"
66

7-
"github.com/lib/pq"
8-
97
"github.com/ovh/cds/engine/api/database/gorpmapping"
108

119
"github.com/ovh/cds/sdk"
@@ -88,7 +86,6 @@ type JobRun struct {
8886
Parameters sql.NullString `db:"variables"`
8987
Status string `db:"status"`
9088
Retry int `db:"retry"`
91-
SpawnAttempts *pq.Int64Array `db:"spawn_attempts"`
9289
Queued time.Time `db:"queued"`
9390
Start time.Time `db:"start"`
9491
Done time.Time `db:"done"`
@@ -116,8 +113,6 @@ func (j *JobRun) ToJobRun(jr *sdk.WorkflowNodeJobRun) (err error) {
116113
}
117114
j.Status = jr.Status
118115
j.Retry = jr.Retry
119-
array := pq.Int64Array(jr.SpawnAttempts)
120-
j.SpawnAttempts = &array
121116
j.Queued = jr.Queued
122117
j.Start = jr.Start
123118
j.Done = jr.Done
@@ -154,9 +149,6 @@ func (j JobRun) WorkflowNodeRunJob() (sdk.WorkflowNodeJobRun, error) {
154149
BookedBy: j.BookedBy,
155150
ContainsService: j.ContainsService,
156151
}
157-
if j.SpawnAttempts != nil {
158-
jr.SpawnAttempts = *j.SpawnAttempts
159-
}
160152
if err := gorpmapping.JSONNullString(j.Job, &jr.Job); err != nil {
161153
return jr, sdk.WrapError(err, "column job")
162154
}

engine/api/workflow_queue.go

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/ovh/cds/engine/api/permission"
2121
"github.com/ovh/cds/engine/api/project"
2222
"github.com/ovh/cds/engine/api/repositoriesmanager"
23-
"github.com/ovh/cds/engine/api/services"
2423
"github.com/ovh/cds/engine/api/worker"
2524
"github.com/ovh/cds/engine/api/workermodel"
2625
"github.com/ovh/cds/engine/api/workflow"
@@ -223,74 +222,6 @@ func (api *API) deleteBookWorkflowJobHandler() service.Handler {
223222
}
224223
}
225224

226-
func (api *API) postIncWorkflowJobAttemptHandler() service.Handler {
227-
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
228-
id, errc := requestVarInt(r, "id")
229-
if errc != nil {
230-
return sdk.WrapError(errc, "Invalid id")
231-
}
232-
h := getHatchery(ctx)
233-
if h == nil {
234-
return service.WriteJSON(w, nil, http.StatusUnauthorized)
235-
}
236-
spawnAttempts, err := workflow.AddNodeJobAttempt(api.mustDB(), id, h.ID)
237-
if err != nil {
238-
return err
239-
}
240-
241-
hCount, err := services.LoadHatcheriesCountByNodeJobRunID(api.mustDB(), id)
242-
if err != nil {
243-
return sdk.WrapError(err, "Cannot get hatcheries count")
244-
}
245-
246-
if int64(len(spawnAttempts)) >= hCount {
247-
infos := []sdk.SpawnInfo{
248-
{
249-
RemoteTime: time.Now(),
250-
Message: sdk.SpawnMsg{
251-
ID: sdk.MsgSpawnInfoHatcheryCannotStartJob.ID,
252-
Args: []interface{}{},
253-
},
254-
},
255-
}
256-
257-
tx, errBegin := api.mustDB().Begin()
258-
if errBegin != nil {
259-
return sdk.WrapError(errBegin, "Cannot start transaction")
260-
}
261-
defer tx.Rollback()
262-
263-
if err := workflow.AddSpawnInfosNodeJobRun(tx, id, infos); err != nil {
264-
return sdk.WrapError(err, "Cannot save spawn info on node job run %d", id)
265-
}
266-
267-
wfNodeJobRun, errLj := workflow.LoadNodeJobRun(tx, api.Cache, id)
268-
if errLj != nil {
269-
return sdk.WrapError(errLj, "Cannot load node job run")
270-
}
271-
272-
wfNodeRun, errLr := workflow.LoadAndLockNodeRunByID(ctx, tx, wfNodeJobRun.WorkflowNodeRunID)
273-
if errLr != nil {
274-
return sdk.WrapError(errLr, "cannot load node run: %d", wfNodeJobRun.WorkflowNodeRunID)
275-
}
276-
277-
if found, err := workflow.SyncNodeRunRunJob(ctx, tx, wfNodeRun, *wfNodeJobRun); err != nil || !found {
278-
return sdk.WrapError(err, "Cannot sync run job (found=%v)", found)
279-
}
280-
281-
if err := workflow.UpdateNodeRun(tx, wfNodeRun); err != nil {
282-
return sdk.WrapError(err, "Cannot update node job run")
283-
}
284-
285-
if err := tx.Commit(); err != nil {
286-
return sdk.WrapError(err, "Cannot commit tx")
287-
}
288-
}
289-
290-
return service.WriteJSON(w, spawnAttempts, http.StatusOK)
291-
}
292-
}
293-
294225
func (api *API) getWorkflowJobHandler() service.Handler {
295226
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
296227
id, errc := requestVarInt(r, "id")

engine/hatchery/local/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (h *HatcheryLocal) CanSpawn(model *sdk.Model, jobID int64, requirements []s
196196
for _, r := range requirements {
197197
ok, err := h.checkRequirement(r)
198198
if err != nil || !ok {
199-
log.Debug("CanSpawn false hatchery.checkRequirement ok:%v err:%v", ok, err)
199+
log.Debug("CanSpawn false hatchery.checkRequirement ok:%v err:%v r:%v", ok, err, r)
200200
return false
201201
}
202202
}

sdk/cdsclient/client_hatchery.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

sdk/cdsclient/client_queue.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,6 @@ func (c *client) QueueJobSendSpawnInfo(ctx context.Context, id int64, in []sdk.S
235235
return err
236236
}
237237

238-
// QueueJobIncAttempts add hatcheryID that cannot run this job and return the spawn attempts list
239-
func (c *client) QueueJobIncAttempts(ctx context.Context, jobID int64) ([]int64, error) {
240-
var spawnAttempts []int64
241-
path := fmt.Sprintf("/queue/workflows/%d/attempt", jobID)
242-
_, err := c.PostJSON(ctx, path, nil, &spawnAttempts)
243-
return spawnAttempts, err
244-
}
245-
246238
// QueueJobBook books a job for a Hatchery
247239
func (c *client) QueueJobBook(ctx context.Context, id int64) error {
248240
path := fmt.Sprintf("/queue/workflows/%d/book", id)

sdk/cdsclient/interface.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,6 @@ type GroupClient interface {
169169
GroupRename(oldGroupname, newGroupname string) error
170170
}
171171

172-
// HatcheryClient exposes hatcheries related functions
173-
type HatcheryClient interface {
174-
HatcheryCount(ctx context.Context, wfNodeRunID int64) (int64, error)
175-
}
176-
177172
// BroadcastClient expose all function for CDS Broadcasts
178173
type BroadcastClient interface {
179174
Broadcasts() ([]sdk.Broadcast, error)
@@ -242,7 +237,6 @@ type QueueClient interface {
242237
QueueArtifactUpload(ctx context.Context, projectKey, integrationName string, nodeJobRunID int64, tag, filePath string) (bool, time.Duration, error)
243238
QueueStaticFilesUpload(ctx context.Context, projectKey, integrationName string, nodeJobRunID int64, name, entrypoint, staticKey string, tarContent io.Reader) (string, bool, time.Duration, error)
244239
QueueJobTag(ctx context.Context, jobID int64, tags []sdk.WorkflowRunTag) error
245-
QueueJobIncAttempts(ctx context.Context, jobID int64) ([]int64, error)
246240
QueueServiceLogs(ctx context.Context, logs []sdk.ServiceLog) error
247241
}
248242

@@ -346,7 +340,6 @@ type Interface interface {
346340
ExportImportInterface
347341
GroupClient
348342
GRPCPluginsClient
349-
HatcheryClient
350343
BroadcastClient
351344
MaintenanceClient
352345
PipelineClient

0 commit comments

Comments
 (0)
0