8000 feat(sdk): run goroutine with pprof labels (#3440) · ovh/cds@64909bd · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 64909bd

Browse files
fsaminyesnault
authored andcommitted
feat(sdk): run goroutine with pprof labels (#3440)
1 parent b8020d7 commit 64909bd

File tree

15 files changed

+122
-187
lines changed

15 files changed

+122
-187
lines changed

engine/api/api.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -635,21 +635,48 @@ func (a *API) Serve(ctx context.Context) error {
635635
event.Subscribe(a.warnChan)
636636

637637
log.Info("Initializing internal routines...")
638-
sdk.GoRoutine("workflow.ComputeAudit", func() { workflow.ComputeAudit(ctx, a.DBConnectionFactory.GetDBMap) })
639-
sdk.GoRoutine("warning.Start", func() { warning.Start(ctx, a.DBConnectionFactory.GetDBMap, a.warnChan) })
640-
sdk.GoRoutine("queue.Pipelines", func() { queue.Pipelines(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
641-
sdk.GoRoutine("pipeline.AWOLPipelineKiller", func() { pipeline.AWOLPipelineKiller(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
642-
sdk.GoRoutine("auditCleanerRoutine(ctx", func() { auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap) })
643-
sdk.GoRoutine("metrics.Initialize", func() { metrics.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.Name) })
644-
sdk.GoRoutine("repositoriesmanager.ReceiveEvents", func() { repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
645-
sdk.GoRoutine("action.RequirementsCacheLoader", func() { action.RequirementsCacheLoader(ctx, 5*time.Second, a.DBConnectionFactory.GetDBMap, a.Cache) })
646-
sdk.GoRoutine("hookRecoverer(ctx", func() { hookRecoverer(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
647-
sdk.GoRoutine("services.KillDeadServices", func() { services.KillDeadServices(ctx, a.mustDB) })
648-
sdk.GoRoutine("migrate.CleanOldWorkflow", func() { migrate.CleanOldWorkflow(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Config.URL.API) })
649-
sdk.GoRoutine("migrate.KeyMigration", func() { migrate.KeyMigration(a.Cache, a.DBConnectionFactory.GetDBMap, &sdk.User{Admin: true}) })
650-
sdk.GoRoutine("broadcast.Initialize", func() { broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap) })
651-
//sdk.GoRoutine("workflow.RestartAwolJobs", func() { workflow.RestartAwolJobs(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
652-
sdk.GoRoutine("a.serviceAPIHeartbeat(ctx", func() { a.serviceAPIHeartbeat(ctx) })
638+
sdk.GoRoutine(ctx, "workflow.ComputeAudit", func(ctx context.Context) {
639+
workflow.ComputeAudit(ctx, a.DBConnectionFactory.GetDBMap)
640+
})
641+
sdk.GoRoutine(ctx, "warning.Start", func(ctx context.Context) {
642+
warning.Start(ctx, a.DBConnectionFactory.GetDBMap, a.warnChan)
643+
})
644+
sdk.GoRoutine(ctx, "queue.Pipelines", func(ctx context.Context) {
645+
queue.Pipelines(ctx, a.Cache, a.D 9E81 BConnectionFactory.GetDBMap)
646+
})
647+
sdk.GoRoutine(ctx, "pipeline.AWOLPipelineKiller", func(ctx context.Context) {
648+
pipeline.AWOLPipelineKiller(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
649+
})
650+
sdk.GoRoutine(ctx, "auditCleanerRoutine(ctx", func(ctx context.Context) {
651+
auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap)
652+
})
653+
sdk.GoRoutine(ctx, "metrics.Initialize", func(ctx context.Context) {
654+
metrics.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.Name)
655+
})
656+
sdk.GoRoutine(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
657+
repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
658+
})
659+
sdk.GoRoutine(ctx, "action.RequirementsCacheLoader", func(ctx context.Context) {
660+
action.RequirementsCacheLoader(ctx, 5*time.Second, a.DBConnectionFactory.GetDBMap, a.Cache)
661+
})
662+
sdk.GoRoutine(ctx, "hookRecoverer(ctx", func(ctx context.Context) {
663+
hookRecoverer(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
664+
})
665+
sdk.GoRoutine(ctx, "services.KillDeadServices", func(ctx context.Context) {
666+
services.KillDeadServices(ctx, a.mustDB)
667+
})
668+
sdk.GoRoutine(ctx, "migrate.CleanOldWorkflow", func(ctx context.Context) {
669+
migrate.CleanOldWorkflow(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Config.URL.API)
670+
})
671+
sdk.GoRoutine(ctx, "migrate.KeyMigration", func(ctx context.Context) {
672+
migrate.KeyMigration(a.Cache, a.DBConnectionFactory.GetDBMap, &sdk.User{Admin: true})
673+
})
674+
sdk.GoRoutine(ctx, "broadcast.Initialize", func(ctx context.Context) {
675+
broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap)
676+
})
677+
sdk.GoRoutine(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
678+
a.serviceAPIHeartbeat(ctx)
679+
})
653680

654681
//Temporary migration code
655682
go migrate.WorkflowNodeRunArtifacts(a.Cache, a.DBConnectionFactory.GetDBMap)
@@ -683,7 +710,7 @@ func (a *API) Serve(ctx context.Context) error {
683710
if err := services.InitExternal(a.mustDB, a.Cache, externalServices); err != nil {
684711
return fmt.Errorf("unable to init external service: %v", err)
685712
}
686-
sdk.GoRoutine("pings-external-services", func() { services.Pings(ctx, a.mustDB, externalServices) })
713+
sdk.GoRoutine(ctx, "pings-external-services", func(ctx context.Context) { services.Pings(ctx, a.mustDB, externalServices) })
687714

688715
// TODO: to delete after migration
689716
if os.Getenv("CDS_MIGRATE_GIT_CLONE") == "true" {
@@ -700,12 +727,12 @@ func (a *API) Serve(ctx context.Context) error {
700727
log.Warning("⚠ Cron Scheduler is disabled")
701728
}
702729

703-
sdk.GoRoutine("workflow.Initialize", func() {
730+
sdk.GoRoutine(ctx, "workflow.Initialize", func(ctx context.Context) {
704731
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
705732
})
706-
sdk.GoRoutine("PushInElasticSearch", func() { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) })
733+
sdk.GoRoutine(ctx, "PushInElasticSearch", func(ctx context.Context) { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) })
707734
metrics.Init(ctx, a.DBConnectionFactory.GetDBMap)
708-
sdk.GoRoutine("Purge", func() { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
735+
sdk.GoRoutine(ctx, "Purge", func(ctx context.Context) { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
709736

710737
s := &http.Server{
711738
Addr: fmt.Sprintf("%s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port),

engine/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *gorp.DbM
3636
api.warnChan = make(chan sdk.Event)
3737
event.Subscribe(api.warnChan)
3838

39-
sdk.GoRoutine("workflow.ComputeAudit", func() { workflow.ComputeAudit(context.Background(), api.DBConnectionFactory.GetDBMap) })
40-
sdk.GoRoutine("warning.Start", func() { warning.Start(context.Background(), api.DBConnectionFactory.GetDBMap, api.warnChan) })
39+
sdk.GoRoutine(context.TODO(), "workflow.ComputeAudit", func(ctx context.Context) { workflow.ComputeAudit(ctx, api.DBConnectionFactory.GetDBMap) })
40+
sdk.GoRoutine(context.TODO(), "warning.Start", func(ctx context.Context) { warning.Start(ctx, api.DBConnectionFactory.GetDBMap, api.warnChan) })
4141

4242
return api, db, router
4343
}

engine/api/events.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,15 @@ func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
7272
}
7373

7474
//Init the eventsBroker
75-
func (b *eventsBroker) Init(c context.Context) {
75+
func (b *eventsBroker) Init(ctx context.Context) {
7676
// Start cache Subscription
77-
subscribeFunc := func() {
78-
b.cacheSubscribe(c, b.messages, b.cache)
79-
}
80-
sdk.GoRoutine("eventsBroker.Init.CacheSubscribe", subscribeFunc)
77+
sdk.GoRoutine(ctx, "eventsBroker.Init.CacheSubscribe", func(ctx context.Context) {
78+
b.cacheSubscribe(ctx, b.messages, b.cache)
79+
})
8180

82-
startFunc := func() {
83-
b.Start(c)
84-
}
85-
sdk.GoRoutine("eventsBroker.Init.Start", startFunc)
81+
sdk.GoRoutine(ctx, "eventsBroker.Init.Start", func(ctx context.Context) {
82+
b.Start(ctx)
83+
})
8684
}
8785

8886
func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {

engine/api/metrics/elasticsearch.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616

1717
var metricsChan chan sdk.Metric
1818

19-
func Init(c context.Context, DBFunc func() *gorp.DbMap) {
19+
// Init the metrics package which push to elasticSearch service
20+
func Init(ctx context.Context, DBFunc func() *gorp.DbMap) {
2021
metricsChan = make(chan sdk.Metric, 50)
21-
sdk.GoRoutine("metrics.PushInElasticSearch", func() { pushInElasticSearch(c, DBFunc) })
22+
sdk.GoRoutine(ctx, "metrics.PushInElasticSearch", func(c context.Context) { pushInElasticSearch(c, DBFunc) })
2223
}
2324

2425
func pushInElasticSearch(c context.Context, DBFunc func() *gorp.DbMap) {

engine/api/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ func (api *API) deleteWorkflowHandler() service.Handler {
447447

448448
event.PublishWorkflowDelete(key, *oldW, getUser(ctx))
449449

450-
sdk.GoRoutine("deleteWorkflowHandler",
451-
func() {
450+
sdk.GoRoutine(ctx, "deleteWorkflowHandler",
451+
func(ctx context.Context) {
452452
txg, errT := api.mustDB().Begin()
453453
if errT != nil {
454454
log.Error("deleteWorkflowHandler> Cannot start transaction: %v", errT)

engine/api/workflow/awol.go

Lines changed: 0 additions & 89 deletions
This file was deleted.

engine/api/workflow_run.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -721,13 +721,11 @@ func (api *API) postWorkflowRunHandler() service.Handler {
721721
go workflow.SendEvent(api.mustDB(), p.Key, report)
722722

723723
// Purge workflow run
724-
sdk.GoRoutine(
725-
"workflow.PurgeWorkflowRun",
726-
func() {
727-
if err := workflow.PurgeWorkflowRun(api.mustDB(), *wf); err != nil {
728-
log.Error("workflow.PurgeWorkflowRun> error %v", err)
729-
}
730-
})
724+
sdk.GoRoutine(ctx, "workflow.PurgeWorkflowRun", func(ctx context.Context) {
725+
if err := workflow.PurgeWorkflowRun(api.mustDB(), *wf); err != nil {
726+
log.Error("workflow.PurgeWorkflowRun> error %v", err)
727+
}
728+
})
731729

732730
var wr *sdk.WorkflowRun
733731
if len(report.WorkflowRuns()) > 0 {

engine/hatchery/kubernetes/kubernetes.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func New() *HatcheryKubernetes {
3939

4040
// Init register local hatchery with its worker model
4141
func (h *HatcheryKubernetes) Init() error {
42-
sdk.GoRoutine("hatchery kubernetes routines", func() {
43-
h.routines(context.Background())
42+
sdk.GoRoutine(context.Background(), "hatchery kubernetes routines", func(ctx context.Context) {
43+
h.routines(ctx)
4444
})
4545
return nil
4646
}
@@ -471,13 +471,13 @@ func (h *HatcheryKubernetes) routines(ctx context.Context) {
471471
for {
472472
select {
473473
case <-ticker.C:
474-
sdk.GoRoutine("getServicesLogs", func() {
474+
sdk.GoRoutine(ctx, "getServicesLogs", func(ctx context.Context) {
475475
if err := h.getServicesLogs(); err != nil {
476476
log.Error("Hatchery> Kubernetes> Cannot get service logs : %v", err)
477477
}
478478
})
479479

480-
sdk.GoRoutine("killAwolWorker", func() {
480+
sdk.GoRoutine(ctx, "killAwolWorker", func(ctx context.Context) {
481481
_ = h.killAwolWorkers()
482482
})
483483
case <-ctx.Done():

engine/hatchery/local/local.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (h *HatcheryLocal) WorkersStartedByModel(model *sdk.Model) int {
354354
// Init register local hatchery with its worker model
355355
func (h *HatcheryLocal) Init() error {
356356
h.workers = make(map[string]workerCmd)
357-
sdk.GoRoutine("startKillAwolWorkerRoutine", h.startKillAwolWorkerRoutine)
357+
sdk.GoRoutine(context.Background(), "startKillAwolWorkerRoutine", h.startKillAwolWorkerRoutine)
358358
return nil
359359
}
360360

@@ -375,7 +375,7 @@ func (h *HatcheryLocal) localWorkerIndexCleanup() {
375375
}
376376
}
377377

378-
func (h *HatcheryLocal) startKillAwolWorkerRoutine() {
378+
func (h *HatcheryLocal) startKillAwolWorkerRoutine(ctx context.Context) {
379379
t := time.NewTicker(5 * time.Second)
380380
for range t.C {
381381
if err := h.killAwolWorkers(); err != nil {

engine/hatchery/swarm/swarm.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (h *HatcherySwarm) Init() error {
158158
}
159159
}
160160

161-
sdk.GoRoutine("swarm", func() { h.routines(context.Background()) })
161+
sdk.GoRoutine(context.Background(), "swarm", func(ctx context.Context) { h.routines(ctx) })
162162

163163
return nil
164164
}
@@ -613,13 +613,13 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
613613
for {
614614
select {
615615
case <-ticker.C:
616-
sdk.GoRoutine("getServicesLogs", func() {
616+
sdk.GoRoutine(ctx, "getServicesLogs", func(ctx context.Context) {
617617
if err := h.getServicesLogs(); err != nil {
618618
log.Error("Hatchery> swarm> Cannot get service logs : %v", err)
619619
}
620620
})
621621

622-
sdk.GoRoutine("killAwolWorker", func() {
622+
sdk.GoRoutine(ctx, "killAwolWorker", func(ctx context.Context) {
623623
_ = h.killAwolWorker()
624624
})
625625
case <-ctx.Done():

0 commit comments

Comments
 (0)
0