8000 fix(hatchery): traces & retries (#4217) · ovh/cds@7c28777 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 7c28777

Browse files
fsaminsguiheux
authored andcommitted
fix(hatchery): traces & retries (#4217)
1 parent 7c4305b commit 7c28777

File tree

3 files changed

+17
-40
lines changed

3 files changed

+17
-40
lines changed

sdk/cdsclient/client_queue.go

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -231,33 +231,15 @@ func (c *client) QueueJobInfo(id int64) (*sdk.WorkflowNodeJobRun, error) {
231231
// QueueJobSendSpawnInfo sends a spawn info on a job
232232
func (c *client) QueueJobSendSpawnInfo(ctx context.Context, id int64, in []sdk.SpawnInfo) error {
233233
path := fmt.Sprintf("/queue/workflows/%d/spawn/infos", id)
234-
var statusCode int
235-
var err error
236-
for retry := 0; retry < 10; retry++ {
237-
statusCode, err = c.PostJSON(ctx, path, &in, nil)
238-
if statusCode != http.StatusConflict {
239-
break
240-
}
241-
time.Sleep(500 * time.Millisecond)
242-
}
234+
_, err := c.PostJSON(ctx, path, &in, nil)
243235
return err
244236
}
245237

246238
// QueueJobIncAttempts add hatcheryID that cannot run this job and return the spawn attempts list
247239
func (c *client) QueueJobIncAttempts(ctx context.Context, jobID int64) ([]int64, error) {
248240
var spawnAttempts []int64
249241
path := fmt.Sprintf("/queue/workflows/%d/attempt", jobID)
250-
retry := 0
251-
var code int
252-
var err error
253-
for {
254-
code, err = c.PostJSON(ctx, path, nil, &spawnAttempts)
255-
if code != http.StatusConflict || retry >= 5 {
256-
break
257-
}
258-
retry++
259-
time.Sleep(250 * time.Millisecond)
260-
}
242+
_, err := c.PostJSON(ctx, path, nil, &spawnAttempts)
261243
return spawnAttempts, err
262244
}
263245

@@ -276,18 +258,8 @@ func (c *client) QueueJobRelease(id int64) error {
276258
}
277259

278260
func (c *client) QueueSendResult(ctx context.Context, id int64, res sdk.Result) error {
279-
var statusCode int
280-
var err error
281-
282261
path := fmt.Sprintf("/queue/workflows/%d/result", id)
283-
for retry := 0; retry < 10; retry++ {
284-
statusCode, err = c.PostJSON(ctx, path, res, nil)
285-
if statusCode != http.StatusConflict {
286-
break
287-
}
288-
time.Sleep(500 * time.Millisecond)
289-
}
290-
262+
_, err := c.PostJSON(ctx, path, res, nil)
291263
return err
292264
}
293265

sdk/cdsclient/http.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ func (c *client) Stream(ctx context.Context, method string, path string, body io
166166
labels := pprof.Labels("user-agent", c.config.userAgent, "path", path, "method", method)
167167
ctx = pprof.WithLabels(ctx, labels)
168168
pprof.SetGoroutineLabels(ctx)
169-
170169
var savederror error
171170

172171
var bodyContent []byte

sdk/hatchery/starter.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
141141
return false, nil
142142
}
143143

144-
_, next := observability.Span(ctx, "hatchery.QueueJobBook")
145-
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
146-
if err := h.CDSClient().QueueJobBook(ctxt, j.id); err != nil {
144+
ctxQueueJobBook, next := observability.Span(ctx, "hatchery.QueueJobBook")
145+
ctxQueueJobBook, cancel := context.WithTimeout(ctxQueueJobBook, 10*time.Second)
146+
if err := h.CDSClient().QueueJobBook(ctxQueueJobBook, j.id); err != nil {
147147
next()
148148
// perhaps already booked by another hatchery
149149
log.Info("hatchery> spawnWorkerForJob> %d - cannot book job %d %s: %s", j.timestamp, j.id, j.model.Name, err)
@@ -154,19 +154,21 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
154154
cancel()
155155
log.Debug("hatchery> spawnWorkerForJob> %d - send book job %d %s by hatchery %d", j.timestamp, j.id, j.model.Name, h.ID())
156156

157+
ctxSendSpawnInfo, next := observability.Span(ctx, "hatchery.SendSpawnInfo", observability.Tag("msg", sdk.MsgSpawnInfoHatcheryStarts.ID))
157158
start := time.Now()
158-
SendSpawnInfo(ctx, h, j.id, sdk.SpawnMsg{
159+
SendSpawnInfo(ctxSendSpawnInfo, h, j.id, sdk.SpawnMsg{
159160
ID: sdk.MsgSpawnInfoHatcheryStarts.ID,
160161
Args: []interface{}{h.Service().Name, fmt.Sprintf("%d", h.ID()), j.model.Name},
161162
})
163+
next()
162164

163165
log.Info("hatchery> spawnWorkerForJob> SpawnWorker> starting model %s for job %d", j.model.Name, j.id)
164166
_, next = observability.Span(ctx, "hatchery.SpawnWorker")
165167
workerName, errSpawn := h.SpawnWorker(j.ctx, SpawnArguments{Model: j.model, JobID: j.id, Requirements: j.requirements, LogInfo: "spawn for job"})
166168
next()
167169
if errSpawn != nil {
168-
_, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "errSpawn"))
169-
SendSpawnInfo(ctx, h, j.id, sdk.SpawnMsg{
170+
ctxSendSpawnInfo, next = observability.Span(ctx, "hatchery.QueueJobSendSpawnInfo", observability.Tag("status", "errSpawn"), observability.Tag("msg", sdk.MsgSpawnInfoHatcheryErrorSpawn.ID))
171+
SendSpawnInfo(ctxSendSpawnInfo, h, j.id, sdk.SpawnMsg{
170172
ID: sdk.MsgSpawnInfoHatcheryErrorSpawn.ID,
171173
Args: []interface{}{h.Service().Name, fmt.Sprintf("%d", h.ID()), j.model.Name, sdk.Round(time.Since(start), time.Second).String(), errSpawn.Error()},
172174
})
@@ -175,20 +177,24 @@ func spawnWorkerForJob(h Interface, j workerStarterRequest) (bool, error) {
175177
return false, nil
176178
}
177179

178-
SendSpawnInfo(ctx, h, j.id, sdk.SpawnMsg{
180+
ctxSendSpawnInfo, next = observability.Span(ctx, "hatchery.SendSpawnInfo", observability.Tag("msg", sdk.MsgSpawnInfoHatcheryStartsSuccessfully.ID))
181+
SendSpawnInfo(ctxSendSpawnInfo, h, j.id, sdk.SpawnMsg{
179182
ID: sdk.MsgSpawnInfoHatcheryStartsSuccessfully.ID,
180183
Args: []interface{}{
181184
h.Service().Name,
182185
fmt.Sprintf("%d", h.ID()),
183186
workerName,
184187
sdk.Round(time.Since(start), time.Second).String()},
185188
})
189+
next()
186190

187191
if j.model.IsDeprecated {
188-
SendSpawnInfo(ctx, h, j.id, sdk.SpawnMsg{
192+
ctxSendSpawnInfo, next = observability.Span(ctx, "hatchery.SendSpawnInfo", observability.Tag("msg", sdk.MsgSpawnInfoDeprecatedModel.ID))
193+
SendSpawnInfo(ctxSendSpawnInfo, h, j.id, sdk.SpawnMsg{
189194
ID: sdk.MsgSpawnInfoDeprecatedModel.ID,
190195
Args: []interface{}{j.model.Name},
191196
})
197+
next()
192198
}
193199
return true, nil // ok for this job
194200
}

0 commit comments

Comments
 (0)
0