8000 fix(api, worker): some fixes about cache & subprocess (#4313) · ovh/cds@255319c · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 255319c

Browse files
yesnaultsguiheux
authored andcommitted
fix(api, worker): some fixes about cache & subprocess (#4313)
1 parent 6ec330e commit 255319c

File tree

12 files changed

+25
-87
lines changed

12 files changed

+25
-87
lines changed

docs/content/docs/components/worker/_index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ worker [flags]
1919

2020
```
2121
--api string URL of CDS API
22-
--auto-update Auto update worker binary from CDS API
2322
--basedir string This directory (default TMPDIR os environment var) will contains worker working directory and temporary files
2423
--booked-job-id int Booked job id
2524
--booked-workflow-job-id int Booked Workflow job id
@@ -33,6 +32,7 @@ worker [flags]
3332
--grpc-api string CDS GRPC tcp address
3433
--grpc-insecure Disable GRPC TLS encryption
3534
--hatchery-name string Hatchery Name spawing worker
35+
-h, --help help for worker
3636
--insecure (SSL) This option explicitly allows curl to perform "insecure" SSL connections and transfers.
3737
--log-level string Log Level: debug, info, notice, warning, critical (default "notice")
3838
--model int Model of worker

engine/api/permission.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func (api *API) deprecatedSetGroupsAndPermissionsFromGroupID(ctx context.Context
5959

6060
func (api *API) checkWorkerPermission(ctx context.Context, db gorp.SqlExecutor, rc *service.HandlerConfig, routeVar map[string]string) bool {
6161
if getWorker(ctx) == nil {
62+
log.Error("checkWorkerPermission> no worker in ctx")
6263
return false
6364
}
6465

@@ -90,6 +91,9 @@ func (api *API) checkWorkerPermission(ctx context.Context, db gorp.SqlExecutor,
9091

9192
ok = runNodeJob.ID == getWorker(ctx).ActionBuildID
9293
api.Cache.SetWithTTL(k, ok, 60*15)
94+
if !ok {
95+
log.Error("checkWorkerPermission> actionBuildID:%v runNodeJob.ID:%v", getWorker(ctx).ActionBuildID, runNodeJob.ID)
96+
}
9397
return ok
9498
}
9599
return true

engine/api/worker/worker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func SetStatus(db gorp.SqlExecutor, workerID string, status sdk.Status) error {
378378
}
379379

380380
// SetToBuilding sets action_build_id and status to building on given worker
381-
func SetToBuilding(db gorp.SqlExecutor, workerID string, actionBuildID int64, jobType string) error {
381+
func SetToBuilding(db gorp.SqlExecutor, store cache.Store, workerID string, actionBuildID int64, jobType string) error {
382382
query := `UPDATE worker SET status = $1, action_build_id = $2, job_type = $3 WHERE id = $4`
383383

384384
res, errE := db.Exec(query, sdk.StatusBuilding.String(), actionBuildID, jobType, workerID)
@@ -387,6 +387,8 @@ func SetToBuilding(db gorp.SqlExecutor, workerID string, actionBuildID int64, jo
387387
}
388388

389389
_, err := res.RowsAffected()
390+
// delete the worker from the cache
391+
store.Delete(cache.Key("worker", workerID))
390392
return err
391393
}
392394

engine/api/workflow_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func takeJob(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store,
139139
}
140140

141141
//Change worker status
142-
if err := worker.SetToBuilding(tx, getWorker(ctx).ID, job.ID, sdk.JobTypeWorkflowNode); err != nil {
142+
if err := worker.SetToBuilding(tx, store, getWorker(ctx).ID, job.ID, sdk.JobTypeWorkflowNode); err != nil {
143143
return nil, sdk.WrapError(err, "Cannot update worker %s status", getWorker(ctx).Name)
144144
}
145145

engine/api/workflow_queue_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ func TestPostVulnerabilityReportHandler(t *testing.T) {
958958
}
959959
testRegisterWorker(t, api, router, &ctx)
960960
ctx.worker.ActionBuildID = wrDB.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID
961-
assert.NoError(t, worker.SetToBuilding(db, ctx.worker.ID, wrDB.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID, sdk.JobTypeWorkflowNode))
961+
assert.NoError(t, worker.SetToBuilding(db, api.Cache, ctx.worker.ID, wrDB.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID, sdk.JobTypeWorkflowNode))
962962

963963
request := sdk.VulnerabilityWorkerReport{
964964
Vulnerabilities: []sdk.Vulnerability{
@@ -1262,7 +1262,7 @@ func TestInsertNewCodeCoverageReport(t *testing.T) {
12621262
}
12631263
testRegisterWorker(t, api, router, &ctx)
12641264
ctx.worker.ActionBuildID = wrr.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID
1265-
assert.NoError(t, worker.SetToBuilding(db, ctx.worker.ID, wrr.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID, sdk.JobTypeWorkflowNode))
1265+
assert.NoError(t, worker.SetToBuilding(db, api.Cache, ctx.worker.ID, wrr.WorkflowNodeRuns[w.WorkflowData.Node.ID][0].Stages[0].RunJobs[0].ID, sdk.JobTypeWorkflowNode))
12661266

12671267
uri := router.GetRoute("POST", api.postWorkflowJobCoverageResultsHandler, vars)
12681268
test.NotEmpty(t, uri)

engine/worker/cmd_main.go

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,18 @@
11
package main
22

33
import (
4-
"os"
5-
"os/exec"
6-
74
"github.com/spf13/cobra"
8-
9-
"github.com/ovh/cds/sdk"
10-
"github.com/ovh/cds/sdk/log"
115
)
126

137
func cmdMain(w *currentWorker) *cobra.Command {
148
var mainCmd = &cobra.Command{
159
Use: "worker",
1610
Short: "CDS Worker",
1711
Long: "A pipeline is structured in sequential stages containing one or multiple concurrent jobs. A Job will be executed by a worker.",
18-
Run: mainCommandRun(w),
12+
Run: runCmd(w),
1913
}
2014

2115
initFlagsRun(mainCmd)
2216

2317
return mainCmd
2418
}
25-
26-
func mainCommandRun(w *currentWorker) func(cmd *cobra.Command, args []string) {
27-
return func(cmd *cobra.Command, args []string) {
28-
var autoUpdate = FlagBool(cmd, flagAutoUpdate)
29-
var singleUse = FlagBool(cmd, flagSingleUse)
30-
31-
log.Initialize(&log.Conf{})
32-
33-
if autoUpdate {
34-
updateCmd(w)(cmd, args)
35-
}
36-
37-
for {
38-
execWorker()
39-
if singleUse {
40-
log.Info("single-use true, worker will be shutdown...")
41-
break
42-
} else {
43-
log.Info("Restarting worker...")
44-
}
45-
}
46-
log.Info("Stopping worker...")
47-
}
48-
}
49-
50-
func execWorker() {
51-
current, errExec := os.Executable()
52-
if errExec != nil {
53-
sdk.Exit("Error on getting current binary worker", errExec)
54-
}
55-
56-
log.Info("Current binary: %s", current)
57-
args := []string{"run"}
58-
args = append(args, os.Args[1:]...)
59-
cmd := exec.Command(current, args...)
60-
cmd.Env = os.Environ()
61-
cmd.Stdout = os.Stdout
62-
cmd.Stderr = os.Stderr
63-
64-
if err := cmd.Start(); err != nil {
65-
log.Error("start err:%s", err)
66-
}
67-
68-
if err := cmd.Wait(); err != nil {
69-
log.Error("wait err:%s", err)
70-
}
71-
}

engine/worker/cmd_run.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
4848
log.Info("CDS Worker starting")
4949
log.Info("version: %s", sdk.VERSION)
5050
log.Info("hostname: %s", hostname)
51-
log.Info("auto-update: %t", w.autoUpdate)
5251
log.Info("single-use: %t", w.singleUse)
5352

5453
httpServerCtx, stopHTTPServer := context.WithCancel(context.Background())
@@ -67,7 +66,7 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
6766
go func() {
6867
select {
6968
case <-c:
70-
defer cancel()
69+
cancel()
7170
return
7271
case <-ctx.Done():
7372
return
@@ -99,8 +98,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
9998
registerTick := time.NewTicker(10 * time.Second)
10099
refreshTick := time.NewTicker(30 * time.Second)
101100

102-
updateTick := time.NewTicker(5 * time.Minute)
103-
104101
// start logger routine with a large buffer
105102
w.logger.logChan = make(chan sdk.Log, 100000)
106103
go w.logProcessor(ctx)
@@ -115,7 +112,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
115112
w.drainLogsAndCloseLogger(ctx)
116113
registerTick.Stop()
117114
refreshTick.Stop()
118-
updateTick.Stop()
119115
w.unregister()
120116
cancel()
121117
stopHTTPServer()
@@ -269,7 +265,10 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
269265
var continueTakeJob = true
270266

271267
// Is the worker is "single use": unregister and exit the worker
272-
if w.singleUse {
268+
if w.singleUse && ctx.Err() == nil {
269+
log.Warning("worker single-use but cancelled")
270+
} else if w.singleUse {
271+
log.Info("worker single-use, see for next jobs now...")
273272
continueTakeJob = false
274273
}
275274

@@ -290,8 +289,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
290289
// Unregister from engine
291290
log.Info("Job is done. Unregistering...")
292291
cancel()
293-
case <-updateTick.C:
294-
w.doUpdate()
295292
}
296293
}
297294
}
@@ -340,18 +337,6 @@ func (w *currentWorker) processBookedWJob(ctx context.Context, wjobs chan<- sdk.
340337
return nil
341338
}
342339

343-
func (w *currentWorker) doUpdate() {
344-
if w.autoUpdate {
345-
version, err := w.client.Version()
346-
if err != nil {
347-
log.Error("Error while getting version from CDS API: %s", err)
348-
}
349-
if version.Version != sdk.VERSION {
350-
sdk.Exit("Exiting this CDS Worker process - auto updating worker")
351-
}
352-
}
353-
}
354-
355340
func (w *currentWorker) doRegister() error {
356341
if w.id == "" {
357342
var info string

engine/worker/init.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
const (
2121
envFlagPrefix = "cds_"
2222
flagSingleUse = "single-use"
23-
flagAutoUpdate = "auto-update"
2423
flagFromGithub = "from-github"
2524
flagForceExit = "force-exit"
2625
flagBaseDir = "basedir"
@@ -46,7 +45,6 @@ const (
4645
func initFlagsRun(cmd *cobra.Command) {
4746
flags := cmd.Flags()
4847
flags.Bool(flagSingleUse, false, "Exit after executing an action")
49-
flags.Bool(flagAutoUpdate, false, "Auto update worker binary from CDS API")
5048
flags.Bool(flagFromGithub, false, "Update binary from latest github release")
5149
flags.Bool(flagForceExit, false, "If single_use=true, force exit. This is useful if it's spawned by an Hatchery (default: worker wait 30min for being killed by hatchery)")
5250
flags.String(flagBaseDir, "", "This directory (default TMPDIR os environment var) will contains worker working directory and temporary files")
@@ -190,7 +188,6 @@ func initFlags(cmd *cobra.Command, w *currentWorker) {
190188

191189
w.client = cdsclient.NewWorker(w.apiEndpoint, w.status.Name, cdsclient.NewHTTPClient(time.Second*360, FlagBool(cmd, flagInsecure)))
192190

193-
w.autoUpdate = FlagBool(cmd, flagAutoUpdate)
194191
w.singleUse = FlagBool(cmd, flagSingleUse)
195192
w.grpc.address = FlagString(cmd, flagGRPCAPI)
196193
w.grpc.insecure = FlagBool(cmd, flagGRPCInsecure)

engine/worker/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
)
1111

1212
type currentWorker struct {
13-
autoUpdate bool
1413
singleUse bool
1514
apiEndpoint string
1615
token string

engine/worker/register.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@ func (w *currentWorker) register(form sdk.WorkerRegistrationForm) error {
4040
w.initGRPCConn()
4141

4242
if !uptodate {
43-
if w.autoUpdate {
44-
log.Warning("-=-=-=-=- your worker binary is not up to date %s %s %s. Auto-updating it... -=-=-=-=-", sdk.VERSION, sdk.GOOS, sdk.GOARCH)
45-
sdk.Exit("Exiting this cds worker process - auto updating worker")
46-
}
4743
log.Warning("-=-=-=-=- Please update your worker binary - Worker Version %s %s %s -=-=-=-=-", sdk.VERSION, sdk.GOOS, sdk.GOARCH)
4844
}
4945

0 commit comments

Comments
 (0)
0