8000 feat(sdk): restart goroutines (#5821) · ovh/cds@f8d2b30 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit f8d2b30

Browse files
authored
feat(sdk): restart goroutines (#5821)
Signed-off-by: francois samin <francois.samin@corp.ovh.com>
1 parent 2f40a2e commit f8d2b30

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+199
-127
lines changed

cli/cdsctl/events.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func eventsListenRun(v cli.Values) error {
6060
chanMessageToSend := make(chan []sdk.WebsocketFilter)
6161
chanErrorReceived := make(chan error)
6262

63-
sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
64-
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
63+
sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
64+
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived)
6565
})
6666

6767
switch {

cli/cdsctl/workflow_log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func workflowLogStreamRun(v cli.Values) error {
437437
chanMsgReceived := make(chan json.RawMessage)
438438
chanErrorReceived := make(chan error)
439439

440-
goRoutines := sdk.NewGoRoutines()
440+
goRoutines := sdk.NewGoRoutines(ctx)
441441
goRoutines.Exec(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
442442
for ctx.Err() == nil {
443443
if err := client.RequestWebsocket(ctx, goRoutines, fmt.Sprintf("%s/item/stream", link.CDNURL), chanMessageToSend, chanMsgReceived, chanErrorReceived); err != nil {

cli/cdsctl/workflow_transform_as_code.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func workflowTransformAsCodeRun(v cli.Values) (interface{}, error) {
5353
chanMessageToSend := make(chan []sdk.WebsocketFilter)
5454
chanErrorReceived := make(chan error)
5555

56-
sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
57-
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
56+
sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) {
57+
client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived)
5858
})
5959

6060
ope, err := client.WorkflowTransformAsCode(projectKey, v.GetString(_WorkflowName), branch, message)

engine/api/api.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func (a *API) Serve(ctx context.Context) error {
545545
}
546546

547547
log.Info(ctx, "Initializing HTTP router")
548-
a.GoRoutines = sdk.NewGoRoutines()
548+
a.GoRoutines = sdk.NewGoRoutines(ctx)
549549
a.Router = &Router{
550550
Mux: mux.NewRouter(),
551551
Background: ctx,
@@ -652,12 +652,12 @@ func (a *API) Serve(ctx context.Context) error {
652652
log.Error(ctx, "error while initializing event system: %s", err)
653653
}
654654

655-
a.GoRoutines.Run(ctx, "event.dequeue", func(ctx context.Context) {
655+
a.GoRoutines.RunWithRestart(ctx, "event.dequeue", func(ctx context.Context) {
656656
event.DequeueEvent(ctx, a.mustDB())
657657
})
658658

659659
log.Info(ctx, "Initializing internal routines...")
660-
a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) {
660+
a.GoRoutines.RunWithRestart(ctx, "maintenance.Subscribe", func(ctx context.Context) {
661661
if err := a.listenMaintenance(ctx); err != nil {
662662
log.Error(ctx, "error while initializing listen maintenance routine: %s", err)
663663
}
@@ -668,7 +668,7 @@ func (a *API) Serve(ctx context.Context) error {
668668
log.Error(ctx, "error while initializing worker models routine: %s", err)
669669
}
670670
})
671-
a.GoRoutines.Run(ctx, "worker.Initialize", func(ctx context.Context) {
671+
a.GoRoutines.RunWithRestart(ctx, "worker.Initialize", func(ctx context.Context) {
672672
if err := worker.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache); err != nil {
673673
log.Error(ctx, "error while initializing workers routine: %s", err)
674674
}
@@ -684,25 +684,25 @@ func (a *API) Serve(ctx context.Context) error {
684684
a.GoRoutines.Run(ctx, "audit.ComputeWorkflowAudit", func(ctx context.Context) {
685685
audit.ComputeWorkflowAudit(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
686686
})
687-
a.GoRoutines.Run(ctx, "auditCleanerRoutine(ctx", func(ctx context.Context) {
687+
a.GoRoutines.Run(ctx, "auditCleanerRoutine", func(ctx context.Context) {
688688
auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
689689
})
690-
a.GoRoutines.Run(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
690+
a.GoRoutines.RunWithRestart(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
691691
repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache)
692692
})
693-
a.GoRoutines.Run(ctx, "services.KillDeadServices", func(ctx context.Context) {
693+
a.GoRoutines.RunWithRestart(ctx, "services.KillDeadServices", func(ctx context.Context) {
694694
services.KillDeadServices(ctx, a.mustDB)
695695
})
696696
a.GoRoutines.Run(ctx, "broadcast.Initialize", func(ctx context.Context) {
697697
broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper))
698698
})
699-
a.GoRoutines.Run(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
699+
a.GoRoutines.RunWithRestart(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
700700
a.serviceAPIHeartbeat(ctx)
701701
})
702-
a.GoRoutines.Run(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
702+
a.GoRoutines.RunWithRestart(ctx, "authentication.SessionCleaner", func(ctx context.Context) {
703703
authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second)
704704
})
705-
a.GoRoutines.Run(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
705+
a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
706706
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
707707
})
708708

engine/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak
4242
api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver()
4343
api.AuthenticationDrivers[sdk.ConsumerTest] = authdrivertest.NewDriver(t)
4444
api.AuthenticationDrivers[sdk.ConsumerTest2] = authdrivertest.NewDriver(t)
45-
api.GoRoutines = sdk.NewGoRoutines()
45+
api.GoRoutines = sdk.NewGoRoutines(context.TODO())
4646

4747
api.InitRouter()
4848

@@ -90,7 +90,7 @@ func newTestServer(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.
9090
api.AuthenticationDrivers = make(map[sdk.AuthConsumerType]sdk.AuthDriver)
9191
api.AuthenticationDrivers[sdk.ConsumerLocal] = local.NewDriver(context.TODO(), false, "http://localhost:8080", "")
9292
api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver()
93-
api.GoRoutines = sdk.NewGoRoutines()
93+
api.GoRoutines = sdk.NewGoRoutines(context.TODO())
9494

9595
api.InitRouter()
9696
ts := httptest.NewServer(router.Mux)

engine/api/application_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) {
274274
chanMessageReceived := make(chan sdk.WebsocketEvent)
275275
chanMessageToSend := make(chan []sdk.WebsocketFilter)
276276
chanErrorReceived := make(chan error)
277-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
277+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
278278
chanMessageToSend <- []sdk.WebsocketFilter{{
279279
Type: sdk.WebsocketFilterTypeAscodeEvent,
280280
ProjectKey: proj.Key,

engine/api/environment_ascode_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) {
233233
chanMessageReceived := make(chan sdk.WebsocketEvent)
234234
chanMessageToSend := make(chan []sdk.WebsocketFilter)
235235
chanErrorReceived := make(chan error)
236-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
236+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
237237
chanMessageToSend <- []sdk.WebsocketFilter{{
238238
Type: sdk.WebsocketFilterTypeAscodeEvent,
239239
ProjectKey: proj.Key,

engine/api/migrate/migration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func Run(ctx context.Context, db gorp.SqlExecutor) {
4646
wg.Add(1)
4747
}
4848

49-
sdk.NewGoRoutines().Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) {
49+
sdk.NewGoRoutines(ctx).Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) {
5050
defer func() {
5151
if currentMigration.Blocker {
5252
wg.Done()

engine/api/pipeline_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) {
205205
chanMessageToSend := make(chan []sdk.WebsocketFilter)
206206
chanErrorReceived := make(chan error)
207207

208-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
208+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
209209
chanMessageToSend <- []sdk.WebsocketFilter{{
210210
Type: sdk.WebsocketFilterTypeAscodeEvent,
211211
ProjectKey: proj.Key,

engine/api/websocket_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func Test_websocketWrongFilters(t *testing.T) {
4545
InsecureSkipVerifyTLS: true,
4646
BuitinConsumerAuthenticationToken: jws,
4747
})
48-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
48+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
4949

5050
// Subscribe to project without project key
5151
chanMessageToSend <- []sdk.WebsocketFilter{{
@@ -126,7 +126,7 @@ func Test_websocketGetWorkflowEvent(t *testing.T) {
126126
InsecureSkipVerifyTLS: true,
127127
SessionToken: jwt,
128128
})
129-
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived)
129+
go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived)
130130
var lastResponse *sdk.WebsocketEvent
131131
go func() {
132132
for e := range chanMessageReceived {
@@ -268,7 +268,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {
268268
InsecureSkipVerifyTLS: true,
269269
SessionToken: jwt,
270270
})
271-
go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived)
271+
go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived)
272272
var client1EventCount int64
273273
go func() {
274274
for {
@@ -299,7 +299,7 @@ func TestWebsocketNoEventLoose(t *testing.T) {
299299
SessionToken: jwt,
300300
})
301301
var client2EventCount int64
302-
go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived)
302+
go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived)
303303
go func() {
304304
for {
305305
select {

0 commit comments

Comments
 (0)
0