From 523858d39cecef45ed6d39fc9e050131c6262cea Mon Sep 17 00:00:00 2001 From: "francois.samin" Date: Fri, 1 Dec 2023 10:37:39 +0100 Subject: [PATCH] fix(api): skip booked jobs in queue Signed-off-by: francois.samin --- engine/api/workflow/dao_node_run_job.go | 25 +++++++++++++++---------- engine/api/workflow_queue.go | 3 +++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/engine/api/workflow/dao_node_run_job.go b/engine/api/workflow/dao_node_run_job.go index 0b2ff2925c..5e2827b8ca 100644 --- a/engine/api/workflow/dao_node_run_job.go +++ b/engine/api/workflow/dao_node_run_job.go @@ -19,13 +19,14 @@ import ( // QueueFilter contains all criteria used to fetch queue type QueueFilter struct { - ModelType []string - Rights int - Since *time.Time - Until *time.Time - Limit *int - Statuses []string - Regions []string + ModelType []string + Rights int + Since *time.Time + Until *time.Time + Limit *int + Statuses []string + Regions []string + SkipBooked bool } func NewQueueFilter() QueueFilter { @@ -101,7 +102,7 @@ func LoadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.S pq.StringArray(filter.Regions), // $5 ) - return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit) + return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit, filter.SkipBooked) } // LoadNodeJobRunQueueByGroupIDs load all workflow_node_run_job accessible @@ -179,10 +180,10 @@ func LoadNodeJobRunQueueByGroupIDs(ctx context.Context, db gorp.SqlExecutor, sto filter.Rights, // $7 pq.StringArray(filter.Regions), // $8 ) - return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit) + return loadNodeJobRunQueue(ctx, db, store, query, filter.Limit, filter.SkipBooked) } -func loadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, query gorpmapping.Query, limit *int) ([]sdk.WorkflowNodeJobRun, error) { +func loadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.Store, query gorpmapping.Query, limit *int, skipBooked bool) ([]sdk.WorkflowNodeJobRun, error) { ctx, end := telemetry.Span(ctx, "workflow.loadNodeJobRunQueue") defer end() @@ -199,6 +200,10 @@ func loadNodeJobRunQueue(ctx context.Context, db gorp.SqlExecutor, store cache.S jobs := make([]sdk.WorkflowNodeJobRun, 0, len(sqlJobs)) for i := range sqlJobs { getHatcheryInfo(ctx, store, &sqlJobs[i]) + if skipBooked && sqlJobs[i].BookedBy.ID != 0 { + log.Info(ctx, "LoadNodeJobRunQueue> WorkflowNodeRunJob %d skipped (booked by %q)", sqlJobs[i].ID, sqlJobs[i].BookedBy.Name) + continue + } jr, err := sqlJobs[i].WorkflowNodeRunJob() if err != nil { log.Error(ctx, "LoadNodeJobRunQueue> WorkflowNodeRunJob error: %v", err) diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index b72d357428..8810fa65cd 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -848,6 +848,9 @@ func (api *API) getWorkflowJobQueueHandler() service.Handler { if modelType != "" { filter.ModelType = []string{modelType} } + if ok, _ := isHatchery(ctx); ok { + filter.SkipBooked = true + } // If the consumer is a hatchery or a non maintainer user, filter the job by its groups if isS || !isMaintainer(ctx) {