8000 feat(hooks): add logs and balance in/out in status (#5131) · ovh/cds@7c9d5fc · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 7c9d5fc

Browse files
authored
feat(hooks): add logs and balance in/out in status (#5131)
1 parent bae5d4e commit 7c9d5fc

File tree

4 files changed

+81
-45
lines changed

4 files changed

+81
-45
lines changed

engine/hooks/dao.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package hooks
33
import (
44
"context"
55
"fmt"
6+
"sync/atomic"
67

78
"github.com/ovh/cds/engine/api/cache"
89
"github.com/ovh/cds/sdk"
@@ -15,7 +16,21 @@ const (
1516
)
1617

1718
type dao struct {
18-
store cache.Store
19+
store cache.Store
20+
enqueuedTaskExecutions int64
21+
dequeuedTaskExecutions int64
22+
}
23+
24+
func (d *dao) enqueuedIncr() {
25+
atomic.AddInt64(&d.enqueuedTaskExecutions, 1)
26+
}
27+
28+
func (d *dao) dequeuedIncr() {
29+
atomic.AddInt64(&d.dequeuedTaskExecutions, 1)
30+
}
31+
32+
func (d *dao) TaskExecutionsBalance() (int64, int64) {
33+
return d.enqueuedTaskExecutions, d.dequeuedTaskExecutions
1934
}
2035

2136
func (d *dao) FindAllTasks(ctx context.Context) ([]sdk.Task, error) {
@@ -91,7 +106,13 @@ func (d *dao) EnqueueTaskExecution(ctx context.Context, r *sdk.TaskExecution) er
91106
if err := d.store.RemoveFromQueue(schedulerQueueKey, k); err != nil {
92107
log.Error(ctx, "error on cache RemoveFromQueue %s: %v", schedulerQueueKey, err)
93108
}
94-
return d.store.Enqueue(schedulerQueueKey, k)
109+
110+
if err := d.store.Enqueue(schedulerQueueKey, k); err != nil {
111+
return err
112+
}
113+
d.enqueuedIncr()
114+
115+
return nil
95116
}
96117

97118
func (d *dao) QueueLen() (int, error) {

engine/hooks/hooks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (s *Service) Serve(c context.Context) error {
8787
}
8888

8989
//Init the DAO
90-
s.Dao = dao{s.Cache}
90+
s.Dao = dao{store: s.Cache}
9191

9292
// Get current maintenance state
9393
var b bool

engine/hooks/hooks_handlers.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,14 @@ func (s *Service) Status(ctx context.Context) sdk.MonitoringStatus {
517517
}
518518
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Queue", Value: fmt.Sprintf("%d", size), Status: status})
519519

520+
// hook balance in status
521+
in, out := s.Dao.TaskExecutionsBalance()
522+
status = sdk.MonitoringStatusOK
523+
if float64(in) > float64(out) {
524+
status = sdk.MonitoringStatusWarn
525+
}
526+
m.Lines = append(m.Lines, sdk.MonitoringStatusLine{Component: "Balance", Value: fmt.Sprintf("%d/%d", in, out), Status: status})
527+
520528
var nbHooksKafkaTotal int64
521529

522530
tasks, err := s.Dao.FindAllTasks(ctx)

engine/hooks/scheduler.go

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error {
1717

1818
go func() {
1919
if err := s.dequeueTaskExecutions(ctx); err != nil {
20-
log.Error(ctx, "Hooks> runScheduler> dequeueLongRunningTasks> %v", err)
20+
log.Error(ctx, "runScheduler> dequeueLongRunningTasks> %v", err)
2121
cancel()
2222
}
2323
}()
2424

2525
go func() {
2626
if err := s.retryTaskExecutionsRoutine(ctx); err 10000 != nil {
27-
log.Error(ctx, "Hooks> runScheduler> retryTaskExecutionsRoutine> %v", err)
27+
log.Error(ctx, "runScheduler> retryTaskExecutionsRoutine> %v", err)
2828
cancel()
2929
}
3030
}()
3131

3232
go func() {
3333
if err := s.enqueueScheduledTaskExecutionsRoutine(ctx); err != nil {
34-
log.Error(ctx, "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
34+
log.Error(ctx, "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v", err)
3535
cancel()
3636
}
3737
}()
3838

3939
go func() {
4040
if err := s.deleteTaskExecutionsRoutine(ctx); err != nil {
41-
log.Error(ctx, "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v", err)
41+
log.Error(ctx, "runScheduler> deleteTaskExecutionsRoutine> %v", err)
4242
cancel()
4343
}
4444
}()
@@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
5858
case <-tick.C:
5959
size, err := s.Dao.QueueLen()
6060
if err != nil {
61-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
61+
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to get queueLen: %v", err)
6262
continue
6363
}
6464
if size > 20 {
65-
log.Warning(ctx, "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
65+
log.Warning(ctx, "too many tasks in scheduler for now, skipped this retry ticker. size:%d", size)
6666
continue
6767
}
6868

6969
if s.Maintenance {
70-
log.Info(ctx, "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
70+
log.Info(ctx, "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d", size)
7171
time.Sleep(1 * time.Minute)
7272
continue
7373
}
7474

7575
tasks, err := s.Dao.FindAllTasks(ctx)
7676
if err != nil {
77-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
77+
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all tasks: %v", err)
78 10000 78
continue
7979
}
8080
for _, t := range tasks {
8181
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
8282
if err != nil {
83-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
83+
log.Error(ctx, "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
8484
continue
8585
}
8686
for _, e := range execs {
@@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
9191
// old hooks
9292
if e.ProcessingTimestamp == 0 && e.Timestamp < time.Now().Add(-2*time.Minute).UnixNano() {
9393
if e.UUID == "" {
94-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
94+
log.Warning(ctx, "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v", e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
9595
continue
9696
}
9797
e.Status = TaskExecutionEnqueued
9898
if err := s.Dao.SaveTaskExecution(&e); err != nil {
99-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
99+
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v", e.UUID, err)
100100
continue
101101
}
102-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
102+
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, e.Timestamp, e.LastError)
103103
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
104-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
104+
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
105105
}
106106
}
107107
if e.NbErrors < s.Cfg.RetryError && e.LastError != "" {
108108
// avoid re-enqueue if the lastError is about a git branch not found
109109
// the branch was deleted from git repository, it will never work
110110
if strings.Contains(e.LastError, "branchName parameter must be provided") {
111-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
111+
log.Warning(ctx, "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
112112
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
113-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
113+
log.Error(ctx, "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
114114
}
115115
continue
116116
}
117117
e.Status = TaskExecutionEnqueued
118118
if err := s.Dao.SaveTaskExecution(&e); err != nil {
119-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
119+
log.Warning(ctx, "retryTaskExecutionsRoutine> unable to save task execution for %s: %v", e.UUID, err)
120120
continue
121121
}
122-
log.Warning(ctx, "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
122+
log.Warning(ctx, "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s", e.UUID, e.NbErrors, s.Cfg.RetryError, e.Type, e.Status, len(e.LastError), e.LastError)
123123
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
124-
log.Error(ctx, "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
124+
log.Error(ctx, "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
125125
}
126126
continue
127127
}
@@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
138138
for {
139139
select {
140140
case <-ctx.Done():
141+
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v", ctx.Err())
141142
return ctx.Err()
142143
case <-tick.C:
143144
tasks, err := s.Dao.FindAllTasks(ctx)
144145
if err != nil {
145-
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
146+
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v", err)
146147
continue
147148
}
148149
for _, t := range tasks {
149150
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
150151
if err != nil {
151-
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
152+
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
152153
continue
153154
}
154155
alreadyEnqueued := false
@@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
157158
// update status before enqueue
158159
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
159160
if alreadyEnqueued {
160-
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
161+
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it", e.UUID, e.Type)
161162
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
162-
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
163+
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
163164
}
164165
} else {
165166
e.Status = TaskExecutionEnqueued
166167
s.Dao.SaveTaskExecution(&e)
167-
log.Info(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
168+
log.Info(ctx, "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d", e.Type, e.UUID, e.Timestamp)
168169
if err := s.Dao.EnqueueTaskExecution(ctx, &e); err != nil {
169-
log.Error(ctx, "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
170+
log.Error(ctx, "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v", err)
170171
}
171172
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
172173
if e.Type == TypeRepoPoller || e.Type == TypeScheduler {
@@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
193194
case <-tick.C:
194195
tasks, err := s.Dao.FindAllTasks(ctx)
195196
if err != nil {
196-
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
197+
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all tasks: %v", err)
197198
continue
198199
}
199200
for _, t := range tasks {
200201
taskToDelete := false
201202
execs, err := s.Dao.FindAllTaskExecutions(ctx, &t)
202203
if err != nil {
203-
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
204+
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v", t.UUID, err)
204205
continue
205206
}
206207
sort.Slice(execs, func(i, j int) bool {
@@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
213214
case TypeBranchDeletion:
214215
if e.Status == TaskExecutionDone && e.ProcessingTimestamp != 0 {
215216
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
216-
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
217+
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
217218
}
218219
taskToDelete = true
219220
}
220221
default:
221222
if i >= s.Cfg.ExecutionHistory && e.ProcessingTimestamp != 0 {
222223
if err := s.Dao.DeleteTaskExecution(&e); err != nil {
223-
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
224+
log.Error(ctx, "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v", err)
224225
}
225226
}
226227
}
@@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
229230

230231
if taskToDelete {
231232
if err := s.deleteTask(ctx, &t); err != nil {
232-
log.Error(ctx, "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
233+
log.Error(ctx, "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v", t.UUID, err)
233234
}
234235
}
235236
}
@@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
241242
func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
242243
for {
243244
if ctx.Err() != nil {
245+
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", ctx.Err())
244246
return ctx.Err()
245247
}
246248
size, err := s.Dao.QueueLen()
247249
if err != nil {
248-
log.Error(ctx, "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v", err)
250+
log.Error(ctx, "dequeueTaskExecutions > Unable to get queueLen: %v", err)
249251
continue
250252
}
251-
log.Debug("Hooks> dequeueTaskExecutions> current queue size: %d", size)
253+
log.Debug("dequeueTaskExecutions> current queue size: %d", size)
252254

253255
if s.Maintenance {
254256
log.Info(ctx, "Maintenance enable, wait 1 minute. Queue %d", size)
@@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
259261
// Dequeuing context
260262
var taskKey string
261263
if ctx.Err() != nil {
264+
log.Error(ctx, "dequeueTaskExecutions> exiting go routine: %v", err)
262265
return ctx.Err()
263266
}
264267
if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, &taskKey); err != nil {
265-
log.Error(ctx, "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
268+
log.Error(ctx, "dequeueTaskExecutions> store.DequeueWithContext err: %v", err)
266269
continue
267270
}
268-
log.Debug("Hooks> dequeueTaskExecutions> work on taskKey: %s", taskKey)
271+
s.Dao.dequeuedIncr()
272+
log.Info(ctx, "dequeueTaskExecutions> work on taskKey: %s", taskKey)
269273

270274
// Load the task execution
271275
var t = sdk.TaskExecution{}
@@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
286290

287291
task := s.Dao.FindTask(ctx, t.UUID)
288292
if task == nil {
289-
log.Error(ctx, "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
293+
log.Error(ctx, "dequeueTaskExecutions failed: Task %s not found - deleting this task execution", t.UUID)
290294
t.LastError = "Internal Error: Task not found"
291295
t.NbErrors++
292296
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
293-
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
297+
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
294298
}
295299
continue
296300

297301
} else if t.NbErrors >= s.Cfg.RetryError {
298-
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
302+
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s", t.UUID, t.NbErrors, t.LastError)
299303
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
300-
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
304+
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
301305
}
302306
continue
303307

@@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
307311
saveTaskExecution = true
308312
} else {
309313
saveTaskExecution = true
310-
log.Debug("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
314+
log.Debug("dequeueTaskExecutions> call doTask on taskKey: %s", taskKey)
311315
var err error
312316
restartTask, err = s.doTask(ctx, task, &t)
313317
if err != nil {
314318
if strings.Contains(err.Error(), "Unsupported task type") {
315319
// delete this task execution, as it will never work
316-
log.Info(ctx, "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
320+
log.Info(ctx, "dequeueTaskExecutions> Deleting task execution %s as err:%v", t.UUID, err)
317321
if err := s.Dao.DeleteTaskExecution(&t); err != nil {
318-
log.Error(ctx, "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
322+
log.Error(ctx, "dequeueTaskExecutions > error on DeleteTaskExecution: %v", err)
319323
}
320324
continue
321325
} else {
322-
log.Error(ctx, "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
326+
log.Error(ctx, "dequeueTaskExecutions> %s failed err[%d]: %v", t.UUID, t.NbErrors, err)
323327
t.LastError = err.Error()
324328
t.NbErrors++
325329
saveTaskExecution = true
@@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
336340

337341
//Start (or restart) the task
338342
if restartTask {
339-
_, _ = s.startTask(ctx, task)
343+
_, err := s.startTask(ctx, task)
344+
if err != nil {
345+
log.Error(ctx, "dequeueTaskExecutions> unable to restart the task %+v after execution: %v", task, err)
346+
}
340347
}
341348
}
342349
}

0 commit comments

Comments
 (0)
0