8000 feat(hatchery): add traces (#3381) · ovh/cds@c379464 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit c379464

Browse files
authored
feat(hatchery): add traces (#3381)
1 parent 947b73b commit c379464

File tree

6 files changed

+34
-17
lines changed

6 files changed

+34
-17
lines changed

engine/api/api_routes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ func (api *API) InitRouter() {
327327
r.Handle("/queue/workflows/{id}/attempt", r.POST(api.postIncWorkflowJobAttemptHandler, NeedHatchery(), EnableTracing()))
328328
r.Handle("/queue/workflows/{id}/infos", r.GET(api.getWorkflowJobHandler, NeedWorker(), EnableTracing()))
329329
r.Handle("/queue/workflows/{permID}/vulnerability", r.POSTEXECUTE(api.postVulnerabilityReportHandler, NeedWorker(), EnableTracing()))
330-
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery()))
330+
r.Handle("/queue/workflows/{id}/spawn/infos", r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), NeedHatchery(), EnableTracing()))
331331
r.Handle("/queue/workflows/{permID}/result", r.POSTEXECUTE(api.postWorkflowJobResultHandler, NeedWorker(), EnableTracing()))
332332
r.Handle("/queue/workflows/{permID}/log", r.POSTEXECUTE(r.Asynchronous(api.postWorkflowJobLogsHandler, 1), NeedWorker()))
333333
r.Handle("/queue/workflows/log/service", r.POSTEXECUTE(r.Asynchronous(api.postWorkflowJobServiceLogsHandler, 1), NeedHatchery()))

engine/api/workflow/dao_node_job_run_info.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func insertNodeRunJobInfo(db gorp.SqlExecutor, info *sdk.WorkflowNodeJobRunInfo)
4949
if n, err := db.Exec(query, info.WorkflowNodeRunID, info.WorkflowNodeJobRunID, spawnJSON, time.Now()); err != nil {
5050
return sdk.WrapError(err, "insertNodeRunJobInfo> err while inserting spawninfos into workflow_node_run_job_info")
5151
} else if n, _ := n.RowsAffected(); n == 0 {
52-
return fmt.Errorf("insertNodeRunJobInfo> Unable to inerto into workflow_node_run_job_info id = %d", info.WorkflowNodeJobRunID)
52+
return fmt.Errorf("insertNodeRunJobInfo> Unable to insert into workflow_node_run_job_info id = %d", info.WorkflowNodeJobRunID)
5353
}
5454

5555
log.Debug("insertNodeRunJobInfo> on node run: %d (%d)", info.ID, info.WorkflowNodeJobRunID)

engine/api/workflow_queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,21 +348,28 @@ func (api *API) postVulnerabilityReportHandler() service.Handler {
348348

349349
func (api *API) postSpawnInfosWorkflowJobHandler() service.AsynchronousHandler {
350350
return func(ctx context.Context, r *http.Request) error {
351+
_, next := observability.Span(ctx, "receiveSpawnInfosWorkflowJob")
351352
id, errc := requestVarInt(r, "id")
352353
if errc != nil {
354+
next()
353355
return sdk.WrapError(errc, "postSpawnInfosWorkflowJobHandler> invalid id")
354356
}
355357
var s []sdk.SpawnInfo
356358
if err := UnmarshalBody(r, &s); err != nil {
359+
next()
357360
return sdk.WrapError(err, "postSpawnInfosWorkflowJobHandler> cannot unmarshal request")
358361
}
362+
observability.Current(ctx, observability.Tag(observability.TagWorkflowNodeJobRun, id))
363+
next()
359364

360365
tx, errBegin := api.mustDB().Begin()
361366
if errBegin != nil {
362367
return sdk.WrapError(errBegin, "postSpawnInfosWorkflowJobHandler> Cannot start transaction")
363368
}
364369
defer tx.Rollback()
365370

371+
_, next = observability.Span(ctx, "workflow.AddSpawnInfosNodeJobRun")
372+
defer next()
366373
if err := workflow.AddSpawnInfosNodeJobRun(tx, id, s); err != nil {
367374
return sdk.WrapError(err, "postSpawnInfosWorkflowJobHandler> Cannot save spawn info on node job run %d for %s name %s", id, getAgent(r), r.Header.Get(cdsclient.RequestedNameHeader))
368375
}

engine/hatchery/marathon/marathon.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/gorilla/mux"
2020

2121
"github.com/ovh/cds/engine/api"
22+
"github.com/ovh/cds/engine/api/observability"
2223
"github.com/ovh/cds/engine/ 57AE api/services"
2324
"github.com/ovh/cds/sdk"
2425
"github.com/ovh/cds/sdk/cdsclient"
@@ -217,6 +218,9 @@ func (h *HatcheryMarathon) CanSpawn(model *sdk.Model, jobID int64, requirements
217218
// SpawnWorker creates an application on mesos via marathon
218219
// requirements services are not supported
219220
func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.SpawnArguments) (string, error) {
221+
ctx, end := observability.Span(ctx, "hatcheryMarathon.SpawnWorker")
222+
defer end()
223+
220224
if spawnArgs.JobID > 0 {
221225
log.Debug("spawnWorker> spawning worker %s (%s) for job %d - %s", spawnArgs.Model.Name, spawnArgs.Model.ModelDocker.Image, spawnArgs.JobID, spawnArgs.LogInfo)
222226
} else {
@@ -351,9 +355,12 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
351355
Labels: &h.marathonLabels,
352356
}
353357

358+
_, next := observability.Span(ctx, "marathonClient.CreateApplication")
354359
if _, err := h.marathonClient.CreateApplication(application); err != nil {
360+
next()
355361
return "", err
356362
}
363+
next()
357364

358365
ticker := time.NewTicker(time.Second * 5)
359366
// ticker.Stop -> do not close goroutine..., so
@@ -377,8 +384,9 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
377384
}()
378385

379386
log.Debug("spawnMarathonDockerWorker> %s worker %s spawning in progress, please wait...", logJob, application.ID)
380-
387+
_, next = observability.Span(ctx, "marathonClient.ApplicationDeployments")
381388
deployments, err := h.marathonClient.ApplicationDeployments(application.ID)
389+
next()
382390
if err != nil {
383391
ticker.Stop()
384392
return "", fmt.Errorf("spawnMarathonDockerWorker> %s failed to list deployments: %s", logJob, err.Error())
@@ -389,6 +397,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
389397
return "", nil
390398
}
391399

400+
_, next = observability.Span(ctx, "waitDeployment")
392401
wg := &sync.WaitGroup{}
393402
var done bool
394403
var successChan = make(chan bool, len(deployments))
@@ -422,6 +431,7 @@ func (h *HatcheryMarathon) SpawnWorker(ctx context.Context, spawnArgs hatchery.S
422431
}
423432

424433
wg.Wait()
434+
next()
425435

426436
var success = true
427437
for b := range successChan {

sdk/hatchery/hatchery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func Create(h Interface) error {
114114
)
115115

116116
// run the starters pool
117-
workersStartChan, workerStartResultChan := startWorkerStarters(h)
117+
workersStartChan, workerStartResultChan := startWorkerStarters(ctx, h)
118118

119119
hostname, errh := os.Hostname()
120120
if errh != nil {

sdk/hatchery/starter.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,32 +50,30 @@ func PanicDump(h Interface) func(s string) (io.WriteCloser, error) {
5050

5151
// Start all goroutines which manage the hatchery worker spawning routine.
5252
// the purpose is to avoid go routines leak when there is a bunch of worker to start
53-
func startWorkerStarters(h Interface) (chan<- workerStarterRequest, chan workerStarterResult) {
53+
func startWorkerStarters(ctx context.Context, h Interface) (chan<- workerStarterRequest, chan workerStarterResult) {
5454
jobs := make(chan workerStarterRequest, 1)
5555
results := make(chan workerStarterResult, 1)
5656

5757
maxProv := h.Configuration().Provision.MaxConcurrentProvisioning
5858
if maxProv < 1 {
5959
maxProv = defaultMaxProvisioning
6060
}
61-
for i := 0; i < maxProv; i++ {
61+
for workerNum := 0; workerNum < maxProv; workerNum++ {
6262
sdk.GoRoutine("workerStarter",
6363
func() {
64-
workerStarter(h, jobs, results)
64+
workerStarter(ctx, h, fmt.Sprintf("%d", workerNum), jobs, results)
6565
},
6666
PanicDump(h),
6767
)
6868
}
69-
7069
return jobs, results
7170
}
7271

73-
func workerStarter(h Interface, jobs <-chan workerStarterRequest, results chan<- workerStarterResult) {
72+
func workerStarter(ctx context.Context, h Interface, workerNum string, jobs <-chan workerStarterRequest, results chan<- workerStarterResult) {
7473
for j := range jobs {
7574
// Start a worker for a job
7675
if m := j.registerWorkerModel; m == nil {
77-
_, end := observability.Span(j.ctx, "hatchery.workerStarter")
78-
//Try to start the worker
76+
ctx2, end := observability.Span(j.ctx, "hatchery.workerStarter")
7977
isRun, err := spawnWorkerForJob(h, j)
8078
//Check the result
8179
res := workerStarterResult{
@@ -84,16 +82,18 @@ func workerStarter(h Interface, jobs <-chan workerStarterRequest, results chan<-
8482
isRun: isRun,
8583
temptToSpawn: true,
8684
}
85+
86+
_, cend := observability.Span(ctx2, "sendResult")
8787
//Send the result back
8888
results <- res
89-
end()
89+
cend()
9090

9191
if err != nil {
9292
j.cancel(err.Error())
9393
} else {
9494
j.cancel("")
9595
}
96-
96+
end()
9797
} else { // Start a worker for registering
9898
log.Debug("Spawning worker for register model %s", m.Name)
9999
if atomic.LoadInt64(&nbWorkerToStart) > int64(h.Configuration().Provision.MaxConcurrentProvisioning) {
@@ -162,8 +162,9 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
162162
log.Info("hatchery> spawnWorkerForJob> SpawnWorker> starting model %s for job %d", j.model.Name, j.id)
163163
_, next = observability.Span(ctx, "hatchery.SpawnWorker")
164164
workerName, errSpawn := h.SpawnWorker(j.ctx, SpawnArguments{Model: j.model, IsWorkflowJob: j.isWorkflowJob, JobID: j.id, Requirements: j.requirements, LogInfo: "spawn for job"})
165+
next()
165166
if errSpawn != nil {
166-
next()
167+
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "errSpawn"))
167168
log.Warning("spawnWorkerForJob> %d - cannot spawn worker %s for job %d: %s", j.timestamp, j.model.Name, j.id, errSpawn)
168169
infos = append(infos, sdk.SpawnInfo{
169170
RemoteTime: time.Now(),
@@ -173,10 +174,9 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
173174
log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job (err spawn)%d: %s", j.timestamp, j.id, err)
174175
}
175176
log.Error("hatchery %s cannot spawn worker %s for job %d: %v", h.Service().Name, j.model.Name, j.id, errSpawn)
176-
177+
next()
177178
return false, nil
178179
}
179-
next()
180180

181181
infos = append(infos, sdk.SpawnInfo{
182182
RemoteTime: time.Now(),
@@ -189,7 +189,7 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
189189
},
190190
})
191191

192-
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo")
192+
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "spawnOK"))
193193
if err := h.CDSClient().QueueJobSendSpawnInfo(j.isWorkflowJob, j.id, infos); err != nil {
194194
next()
195195
log.Warning("spawnWorkerForJob> %d - cannot client.QueueJobSendSpawnInfo for job %d: %s", j.timestamp, j.id, err)

0 commit comments

Comments
 (0)
0