From 9b51bf07fdf2fb7aea73e694e2a060e5b877ea77 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Fri, 23 Aug 2024 13:47:36 -0400 Subject: [PATCH 1/3] fix: required affinity --- pkg/scheduling/affinity_test.go | 7 +++ pkg/scheduling/fixtures/affinity_hard.json | 21 +++++++++ .../fixtures/affinity_hard_output.json | 45 +++++++++++++++++++ ..._output.json => affinity_soft_output.json} | 0 pkg/scheduling/scheduling_test.go | 4 +- pkg/scheduling/worker_state.go | 3 ++ pkg/scheduling/worker_state_manager.go | 8 ++-- 7 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 pkg/scheduling/fixtures/affinity_hard_output.json rename pkg/scheduling/fixtures/{affinity_output.json => affinity_soft_output.json} (100%) diff --git a/pkg/scheduling/affinity_test.go b/pkg/scheduling/affinity_test.go index 740fdcf6a3..e28de44617 100644 --- a/pkg/scheduling/affinity_test.go +++ b/pkg/scheduling/affinity_test.go @@ -75,6 +75,13 @@ func TestComputeWeight(t *testing.T) { { name: "No match for a required returns -1", desiredLabels: []*dbsqlc.GetDesiredLabelsRow{ + { + Key: "memory", + Comparator: dbsqlc.WorkerLabelComparatorEQUAL, + StrValue: sqlchelpers.TextFromStr("100"), + Weight: 15, + Required: false, + }, { Key: "region", Comparator: dbsqlc.WorkerLabelComparatorEQUAL, diff --git a/pkg/scheduling/fixtures/affinity_hard.json b/pkg/scheduling/fixtures/affinity_hard.json index ee41e3349f..2d37d2f60d 100644 --- a/pkg/scheduling/fixtures/affinity_hard.json +++ b/pkg/scheduling/fixtures/affinity_hard.json @@ -52,6 +52,18 @@ "tenantId": "707d0855-80ab-4e1f-a156-f1c4546cbf52", "queue": "child:process2", "Order": 0 + }, + { + "id": 152260, + "stepRunId": "064e787a-6cfd-4f82-8b6d-d8031459fdef", + "stepId": "e2c744b8-bebb-4b05-8292-d2ce36f27929", + "actionId": "child:process2", + "scheduleTimeoutAt": "2024-08-07T21:12:49.56Z", + "stepTimeout": "60s", + "isQueued": true, + "tenantId": "707d0855-80ab-4e1f-a156-f1c4546cbf52", + "queue": "child:process2", + "Order": 0 } ], "WorkerLabels": { @@ -86,6 +98,15 @@ "Required": true, "Weight": 1 } + ], + "e2c744b8-bebb-4b05-8292-d2ce36f27929": [ + { + "Key": "MODEL", + "Comparator": "EQUAL", + "StrValue": "C", + "Required": true, + "Weight": 1 + } ] } } diff --git a/pkg/scheduling/fixtures/affinity_hard_output.json b/pkg/scheduling/fixtures/affinity_hard_output.json new file mode 100644 index 0000000000..9f5d1f45da --- /dev/null +++ b/pkg/scheduling/fixtures/affinity_hard_output.json @@ -0,0 +1,45 @@ +{ + "StepRunIds": [ + "16fa711e-c03e-435c-88d9-62adf1591d98", + "064e787a-6cfd-4f82-8b6d-d8031459fdee" + ], + "StepRunTimeouts": [ + "60s", + "60s" + ], + "SlotIds": [ + "8cf68f09-b914-4f31-9777-8082b751a2d4", + "aed946be-43db-4b5f-876a-c6c71f4d52aa" + ], + "WorkerIds": [ + "aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa", + "bbbbbbbb-43db-4b5f-876a-c6c71f4d52aa" + ], + "UnassignedStepRunIds": [ + "064e787a-6cfd-4f82-8b6d-d8031459fdef" + ], + "QueuedStepRuns": [ + { + "StepRunId": "16fa711e-c03e-435c-88d9-62adf1591d98", + "WorkerId": "aaaaaaaa-43db-4b5f-876a-c6c71f4d52aa", + "DispatcherId": "9994a9eb-430d-46da-934d-d9dd953cfd21" + }, + { + "StepRunId": "064e787a-6cfd-4f82-8b6d-d8031459fdee", + "WorkerId": "bbbbbbbb-43db-4b5f-876a-c6c71f4d52aa", + "DispatcherId": "9994a9eb-430d-46da-934d-d9dd953cfd21" + } + ], + "TimedOutStepRuns": [], + "RateLimitedStepRuns": [], + "RateLimitedQueues": {}, + "QueuedItems": [ + 137295, + 152259 + ], + "ShouldContinue": false, + "MinQueuedIds": { + "child:process2": 0 + }, + "RateLimitUnitsConsumed": {} +} diff --git a/pkg/scheduling/fixtures/affinity_output.json b/pkg/scheduling/fixtures/affinity_soft_output.json similarity index 100% rename from pkg/scheduling/fixtures/affinity_output.json rename to pkg/scheduling/fixtures/affinity_soft_output.json diff --git a/pkg/scheduling/scheduling_test.go b/pkg/scheduling/scheduling_test.go index c994ec3d23..2a60cf1c0f 100644 --- a/pkg/scheduling/scheduling_test.go +++ b/pkg/scheduling/scheduling_test.go @@ -156,7 +156,7 @@ func TestGeneratePlan(t *testing.T) { name: "GeneratePlan_Affinity_Soft", args: args{ fixtureArgs: "./fixtures/affinity_soft.json", - fixtureResult: "./fixtures/affinity_output.json", + fixtureResult: "./fixtures/affinity_soft_output.json", noTimeout: true, }, want: func(s SchedulePlan, fixtureResult string) bool { @@ -175,7 +175,7 @@ func TestGeneratePlan(t *testing.T) { name: "GeneratePlan_Affinity_Hard", args: args{ fixtureArgs: "./fixtures/affinity_hard.json", - fixtureResult: "./fixtures/affinity_output.json", + fixtureResult: "./fixtures/affinity_hard_output.json", noTimeout: true, }, want: func(s SchedulePlan, fixtureResult string) bool { diff --git a/pkg/scheduling/worker_state.go b/pkg/scheduling/worker_state.go index 2a21943087..7505decc0c 100644 --- a/pkg/scheduling/worker_state.go +++ b/pkg/scheduling/worker_state.go @@ -1,6 +1,8 @@ package scheduling import ( + "fmt" + "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers" ) @@ -42,6 +44,7 @@ func (w *WorkerState) CanAssign(action string, stepId *string) bool { } if weight, ok := w.stepWeights[*stepId]; ok { + fmt.Println("weight", weight) return weight >= 0 } diff --git a/pkg/scheduling/worker_state_manager.go b/pkg/scheduling/worker_state_manager.go index ff0864f52e..6edc4b2b41 100644 --- a/pkg/scheduling/worker_state_manager.go +++ b/pkg/scheduling/worker_state_manager.go @@ -38,10 +38,10 @@ func NewWorkerStateManager( for workerId, worker := range workers { weight := ComputeWeight(desired, worker.labels) - // skip workers that are not a match (i.e. required) - if weight < 0 { - continue - } + // // skip workers that are not a match (i.e. required) + // if weight < 0 { + // continue + // } // cache the weight on the worker workers[workerId].AddStepWeight(stepId, weight) From 6298d480ca9fadd38311b65a8781ffceac199ce0 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Fri, 23 Aug 2024 14:41:57 -0400 Subject: [PATCH 2/3] chore: rm dead code --- pkg/scheduling/worker_state_manager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/scheduling/worker_state_manager.go b/pkg/scheduling/worker_state_manager.go index 6edc4b2b41..229de861cd 100644 --- a/pkg/scheduling/worker_state_manager.go +++ b/pkg/scheduling/worker_state_manager.go @@ -38,11 +38,6 @@ func NewWorkerStateManager( for workerId, worker := range workers { weight := ComputeWeight(desired, worker.labels) - // // skip workers that are not a match (i.e. required) - // if weight < 0 { - // continue - // } - // cache the weight on the worker workers[workerId].AddStepWeight(stepId, weight) From 37be843a43898b186179852e877a85aacd2562e0 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Fri, 23 Aug 2024 14:42:51 -0400 Subject: [PATCH 3/3] chore: rm comment --- pkg/scheduling/worker_state.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/scheduling/worker_state.go b/pkg/scheduling/worker_state.go index 7505decc0c..2a21943087 100644 --- a/pkg/scheduling/worker_state.go +++ b/pkg/scheduling/worker_state.go @@ -1,8 +1,6 @@ package scheduling import ( - "fmt" - "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers" ) @@ -44,7 +42,6 @@ func (w *WorkerState) CanAssign(action string, stepId *string) bool { } if weight, ok := w.stepWeights[*stepId]; ok { - fmt.Println("weight", weight) return weight >= 0 }