@@ -17,28 +17,28 @@ func (s *Service) runScheduler(c context.Context) error {
17
17
18
18
go func () {
19
19
if err := s .dequeueTaskExecutions (ctx ); err != nil {
20
- log .Error (ctx , "Hooks> runScheduler> dequeueLongRunningTasks> %v" , err )
20
+ log .Error (ctx , "runScheduler> dequeueLongRunningTasks> %v" , err )
21
21
cancel ()
22
22
}
23
23
}()
24
24
25
25
go func () {
26
26
if err := s .retryTaskExecutionsRoutine (ctx ); err
10000
!= nil {
27
- log .Error (ctx , "Hooks> runScheduler> retryTaskExecutionsRoutine> %v" , err )
27
+ log .Error (ctx , "runScheduler> retryTaskExecutionsRoutine> %v" , err )
28
28
cancel ()
29
29
}
30
30
}()
31
31
32
32
go func () {
33
33
if err := s .enqueueScheduledTaskExecutionsRoutine (ctx ); err != nil {
34
- log .Error (ctx , "Hooks> runScheduler> enqueueScheduledTaskExecutionsRoutine> %v" , err )
34
+ log .Error (ctx , "runScheduler> enqueueScheduledTaskExecutionsRoutine> %v" , err )
35
35
cancel ()
36
36
}
37
37
}()
38
38
39
39
go func () {
40
40
if err := s .deleteTaskExecutionsRoutine (ctx ); err != nil {
41
- log .Error (ctx , "Hooks> runScheduler> deleteTaskExecutionsRoutine> %v" , err )
41
+ log .Error (ctx , "runScheduler> deleteTaskExecutionsRoutine> %v" , err )
42
42
cancel ()
43
43
}
44
44
}()
@@ -58,29 +58,29 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
58
58
case <- tick .C :
59
59
size , err := s .Dao .QueueLen ()
60
60
if err != nil {
61
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to get queueLen: %v" , err )
61
+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to get queueLen: %v" , err )
62
62
continue
63
63
}
64
64
if size > 20 {
65
- log .Warning (ctx , "Hooks> too many tasks in scheduler for now, skipped this retry ticker. size:%d" , size )
65
+ log .Warning (ctx , "too many tasks in scheduler for now, skipped this retry ticker. size:%d" , size )
66
66
continue
67
67
}
68
68
69
69
if s .Maintenance {
70
- log .Info (ctx , "Hooks> retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d" , size )
70
+ log .Info (ctx , "retryTaskExecutionsRoutine> Maintenance enable, wait 1 minute. Queue %d" , size )
71
71
time .Sleep (1 * time .Minute )
72
72
continue
73
73
}
74
74
75
75
tasks , err := s .Dao .FindAllTasks (ctx )
76
76
if err != nil {
77
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
77
+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
78
10000
78
continue
79
79
}
80
80
for _ , t := range tasks {
81
81
execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
82
82
if err != nil {
83
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
83
+ log .Error (ctx , "retryTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
84
84
continue
85
85
}
86
86
for _ , e := range execs {
@@ -91,37 +91,37 @@ func (s *Service) retryTaskExecutionsRoutine(ctx context.Context) error {
91
91
// old hooks
92
92
if e .ProcessingTimestamp == 0 && e .Timestamp < time .Now ().Add (- 2 * time .Minute ).UnixNano () {
93
93
if e .UUID == "" {
94
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v" , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
94
+ log .Warning (ctx , "retryTaskExecutionsRoutine > Very old hook without UUID %d/%d type:%s status:%s timestamp:%d err:%v" , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
95
95
continue
96
96
}
97
97
e .Status = TaskExecutionEnqueued
98
98
if err := s .Dao .SaveTaskExecution (& e ); err != nil {
99
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v" , e .UUID , err )
99
+ log .Warning (ctx , "retryTaskExecutionsRoutine> unable to save task execution for old hook %s: %v" , e .UUID , err )
100
100
continue
101
101
}
102
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
102
+ log .Warning (ctx , "retryTaskExecutionsRoutine > Enqueing very old hooks %s %d/%d type:%s status:%s timestamp:%d err:%v" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , e .Timestamp , e .LastError )
103
103
if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
104
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
104
+ log .Error (ctx , "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
105
105
}
106
106
}
107
107
if e .NbErrors < s .Cfg .RetryError && e .LastError != "" {
108
108
// avoid re-enqueue if the lastError is about a git branch not found
109
109
// the branch was deleted from git repository, it will never work
110
110
if strings .Contains (e .LastError , "branchName parameter must be provided" ) {
111
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
111
+ log .Warning (ctx , "retryTaskExecutionsRoutine > Do not re-enqueue this taskExecution with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
112
112
if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
113
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
113
+ log .Error (ctx , "retryTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
114
114
}
115
115
continue
116
116
}
117
117
e .Status = TaskExecutionEnqueued
118
118
if err := s .Dao .SaveTaskExecution (& e ); err != nil {
119
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine> unable to save task execution for %s: %v" , e .UUID , err )
119
+ log .Warning (ctx , "retryTaskExecutionsRoutine> unable to save task execution for %s: %v" , e .UUID , err )
120
120
continue
121
121
}
122
- log .Warning (ctx , "Hooks> retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
122
+ log .Warning (ctx , "retryTaskExecutionsRoutine > Enqueing with lastError %s %d/%d type:%s status:%s len:%d err:%s" , e .UUID , e .NbErrors , s .Cfg .RetryError , e .Type , e .Status , len (e .LastError ), e .LastError )
123
123
if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
124
- log .Error (ctx , "Hooks> retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
124
+ log .Error (ctx , "retryTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
125
125
}
126
126
continue
127
127
}
@@ -138,17 +138,18 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
138
138
for {
139
139
select {
140
140
case <- ctx .Done ():
141
+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > exiting goroutine: %v" , ctx .Err ())
141
142
return ctx .Err ()
142
143
case <- tick .C :
143
144
tasks , err := s .Dao .FindAllTasks (ctx )
144
145
if err != nil {
145
- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
146
+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
146
147
continue
147
148
}
148
149
for _ , t := range tasks {
149
150
execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
150
151
if err != nil {
151
- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
152
+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
152
153
continue
153
154
}
154
155
alreadyEnqueued := false
@@ -157,16 +158,16 @@ func (s *Service) enqueueScheduledTaskExecutionsRoutine(ctx context.Context) err
157
158
// update status before enqueue
158
159
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
159
160
if alreadyEnqueued {
160
- log .Info (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it" , e .UUID , e .Type )
161
+ log .Info (ctx , "enqueueScheduledTaskExecutionsRoutine > task execution already enqueued for this task %s of type %s- delete it" , e .UUID , e .Type )
161
162
if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
162
- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
163
+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
163
164
}
164
165
} else {
165
166
e .Status = TaskExecutionEnqueued
166
167
s .Dao .SaveTaskExecution (& e )
167
- log .Info (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d" , e .Type , e .UUID , e .Timestamp )
168
+ log .Info (ctx , "enqueueScheduledTaskExecutionsRoutine > Enqueing %s task %s:%d" , e .Type , e .UUID , e .Timestamp )
168
169
if err := s .Dao .EnqueueTaskExecution (ctx , & e ); err != nil {
169
- log .Error (ctx , "Hooks> enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
170
+ log .Error (ctx , "enqueueScheduledTaskExecutionsRoutine > error on EnqueueTaskExecution: %v" , err )
170
171
}
171
172
// this will avoid to re-enqueue the same scheduled task execution if the dequeue take more than 30s (ticker of this goroutine)
172
173
if e .Type == TypeRepoPoller || e .Type == TypeScheduler {
@@ -193,14 +194,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
193
194
case <- tick .C :
194
195
tasks , err := s .Dao .FindAllTasks (ctx )
195
196
if err != nil {
196
- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
197
+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to find all tasks: %v" , err )
197
198
continue
198
199
}
199
200
for _ , t := range tasks {
200
201
taskToDelete := false
201
202
execs , err := s .Dao .FindAllTaskExecutions (ctx , & t )
202
203
if err != nil {
203
- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
204
+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to find all task executions (%s): %v" , t .UUID , err )
204
205
continue
205
206
}
206
207
sort .Slice (execs , func (i , j int ) bool {
@@ -213,14 +214,14 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
213
214
case TypeBranchDeletion :
214
215
if e .Status == TaskExecutionDone && e .ProcessingTimestamp != 0 {
215
216
if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
216
- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
217
+ log .Error (ctx , "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
217
218
}
218
219
taskToDelete = true
219
220
}
220
221
default :
221
222
if i >= s .Cfg .ExecutionHistory && e .ProcessingTimestamp != 0 {
222
223
if err := s .Dao .DeleteTaskExecution (& e ); err != nil {
223
- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
224
+ log .Error (ctx , "deleteTaskExecutionsRoutine > error on DeleteTaskExecution: %v" , err )
224
225
}
225
226
}
226
227
}
@@ -229,7 +230,7 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
229
230
230
231
if taskToDelete {
231
232
if err := s .deleteTask (ctx , & t ); err != nil {
232
- log .Error (ctx , "Hooks> deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v" , t .UUID , err )
233
+ log .Error (ctx , "deleteTaskExecutionsRoutine > Unable to deleteTask (%s): %v" , t .UUID , err )
233
234
}
234
235
}
235
236
}
@@ -241,14 +242,15 @@ func (s *Service) deleteTaskExecutionsRoutine(ctx context.Context) error {
241
242
func (s * Service ) dequeueTaskExecutions (ctx context.Context ) error {
242
243
for {
243
244
if ctx .Err () != nil {
245
+ log .Error (ctx , "dequeueTaskExecutions> exiting go routine: %v" , ctx .Err ())
244
246
return ctx .Err ()
245
247
}
246
248
size , err := s .Dao .QueueLen ()
247
249
if err != nil {
248
- log .Error (ctx , "Hooks> dequeueTaskExecutions > Unable to get queueLen: %v" , err )
250
+ log .Error (ctx , "dequeueTaskExecutions > Unable to get queueLen: %v" , err )
249
251
continue
250
252
}
251
- log .Debug ("Hooks> dequeueTaskExecutions> current queue size: %d" , size )
253
+ log .Debug ("dequeueTaskExecutions> current queue size: %d" , size )
252
254
253
255
if s .Maintenance {
254
256
log .Info (ctx , "Maintenance enable, wait 1 minute. Queue %d" , size )
@@ -259,13 +261,15 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
259
261
// Dequeuing context
260
262
var taskKey string
261
263
if ctx .Err () != nil {
264
+ log .Error (ctx , "dequeueTaskExecutions> exiting go routine: %v" , err )
262
265
return ctx .Err ()
263
266
}
264
267
if err := s .Cache .DequeueWithContext (ctx , schedulerQueueKey , & taskKey ); err != nil {
265
- log .Error (ctx , "Hooks> dequeueTaskExecutions> store.DequeueWithContext err: %v" , err )
268
+ log .Error (ctx , "dequeueTaskExecutions> store.DequeueWithContext err: %v" , err )
266
269
continue
267
270
}
268
- log .Debug ("Hooks> dequeueTaskExecutions> work on taskKey: %s" , taskKey )
271
+ s .Dao .dequeuedIncr ()
272
+ log .Info (ctx , "dequeueTaskExecutions> work on taskKey: %s" , taskKey )
269
273
270
274
// Load the task execution
271
275
var t = sdk.TaskExecution {}
@@ -286,18 +290,18 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
286
290
287
291
task := s .Dao .FindTask (ctx , t .UUID )
288
292
if task == nil {
289
- log .Error (ctx , "Hooks> dequeueTaskExecutions failed: Task %s not found - deleting this task execution" , t .UUID )
293
+ log .Error (ctx , "dequeueTaskExecutions failed: Task %s not found - deleting this task execution" , t .UUID )
290
294
t .LastError = "Internal Error: Task not found"
291
295
t .NbErrors ++
292
296
if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
293
- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
297
+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
294
298
}
295
299
continue
296
300
297
301
} else if t .NbErrors >= s .Cfg .RetryError {
298
- log .Info (ctx , "Hooks> dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s" , t .UUID , t .NbErrors , t .LastError )
302
+ log .Info (ctx , "dequeueTaskExecutions> Deleting task execution %s cause: to many errors:%d lastError:%s" , t .UUID , t .NbErrors , t .LastError )
299
303
if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
300
- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
304
+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
301
305
}
302
306
continue
303
307
@@ -307,19 +311,19 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
307
311
saveTaskExecution = true
308
312
} else {
309
313
saveTaskExecution = true
310
- log .Debug ("Hooks> dequeueTaskExecutions> call doTask on taskKey: %s" , taskKey )
314
+ log .Debug ("dequeueTaskExecutions> call doTask on taskKey: %s" , taskKey )
311
315
var err error
312
316
restartTask , err = s .doTask (ctx , task , & t )
313
317
if err != nil {
314
318
if strings .Contains (err .Error (), "Unsupported task type" ) {
315
319
// delete this task execution, as it will never work
316
- log .Info (ctx , "Hooks> dequeueTaskExecutions> Deleting task execution %s as err:%v" , t .UUID , err )
320
+ log .Info (ctx , "dequeueTaskExecutions> Deleting task execution %s as err:%v" , t .UUID , err )
317
321
if err := s .Dao .DeleteTaskExecution (& t ); err != nil {
318
- log .Error (ctx , "Hooks> dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
322
+ log .Error (ctx , "dequeueTaskExecutions > error on DeleteTaskExecution: %v" , err )
319
323
}
320
324
continue
321
325
} else {
322
- log .Error (ctx , "Hooks> dequeueTaskExecutions> %s failed err[%d]: %v" , t .UUID , t .NbErrors , err )
326
+ log .Error (ctx , "dequeueTaskExecutions> %s failed err[%d]: %v" , t .UUID , t .NbErrors , err )
323
327
t .LastError = err .Error ()
324
328
t .NbErrors ++
325
329
saveTaskExecution = true
@@ -336,7 +340,10 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error {
336
340
337
341
//Start (or restart) the task
338
342
if restartTask {
339
- _ , _ = s .startTask (ctx , task )
343
+ _ , err := s .startTask (ctx , task )
344
+ if err != nil {
345
+ log .Error (ctx , "dequeueTaskExecutions> unable to restart the task %+v after execution: %v" , task , err )
346
+ }
340
347
}
341
348
}
342
349
}
0 commit comments