8000 feat: worker semaphore v2 by grutt · Pull Request #540 · hatchet-dev/hatchet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: worker semaphore v2 #540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c7d5a25
wip: semaphore slot table
grutt May 29, 2024
8cdd5ae
wip: acquire and release semaphore
grutt May 29, 2024
c0d39cb
Fix deadlock/pool exhaustion
abelanger5 May 29, 2024
c347190
fix: update skip locked cte
grutt May 29, 2024
0775f4d
feat: release resolver
grutt May 30, 2024
c6526aa
wip: migration
grutt May 30, 2024
c8a240c
chore: migrate running steps to semaphore
grutt May 30, 2024
4b485ea
feat: add next alert column
grutt May 31, 2024
0d356dc
feat: add notification templates
grutt May 31, 2024
b0aebaf
feat: add poll token ticker
grutt May 31, 2024
f2eca2e
fix: expiring 7 days
grutt May 31, 2024
96f65ed
fix: no expired tokens
grutt May 31, 2024
cb4de56
fix: subject string
grutt May 31, 2024
98219be
fix: message string
grutt May 31, 2024
415e812
fix: slack format
grutt May 31, 2024
1d98384
feat: add config columns
grutt May 31, 2024
ae6bb9b
feat: expose api
grutt May 31, 2024
43fb2d6
feat: update alerter opts
grutt May 31, 2024
cdad1d1
feat: update ui
grutt May 31, 2024
76a78a8
chore: ui tweaks
grutt May 31, 2024
5f69845
fix: ui tweaks
grutt May 31, 2024
8931830
Merge branch 'main' into feat--default-email-alert-group
grutt May 31, 2024
160c3b9
Merge branch 'main' into feat--worker-semaphore-v2
grutt May 31, 2024
1d0c3f8
Merge branch 'feat--default-email-alert-group' into feat--worker-sema…
grutt May 31, 2024
5e3857c
fix: migration with loop
grutt May 31, 2024
a47d16a
chore: generate
grutt May 31, 2024
b704df7
Merge branch 'main' into feat--worker-semaphore-v2
grutt Jun 3, 2024
6123175
chore: remove unused fn
grutt Jun 4, 2024
c7ff2ce
feat: add worker id index on slot
grutt Jun 4, 2024
c8aedd1
chore: rm unused fn
grutt Jun 4, 2024
c437cd9
fix: typo
grutt Jun 4, 2024
1150b09
feat: optimized query
grutt Jun 4, 2024
72afbdc
chore: generate all
grutt Jun 4, 2024
3eed805
chore: collapse migration
grutt Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions api/v1/server/oas/transformers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ func ToWorker(worker *db.WorkerModel) *gen.Worker {
dispatcherUuid = uuid.MustParse(id)
}

var maxRuns int

if runs, ok := worker.MaxRuns(); ok {
maxRuns = runs
}

status := gen.ACTIVE

if lastHeartbeat, ok := worker.LastHeartbeatAt(); ok && lastHeartbeat.Add(4*time.Second).Before(time.Now()) {
Expand All @@ -35,7 +29,7 @@ func ToWorker(worker *db.WorkerModel) *gen.Worker {
Name: worker.Name,
DispatcherId: &dispatcherUuid,
Status: &status,
MaxRuns: &maxRuns,
MaxRuns: &worker.MaxRuns,
}

if lastHeartbeatAt, ok := worker.LastHeartbeatAt(); ok {
Expand All @@ -46,9 +40,13 @@ func ToWorker(worker *db.WorkerModel) *gen.Worker {
res.LastListenerEstablished = &lastListenerEstablished
}

if semaphore, ok := worker.Semaphore(); ok {
res.AvailableRuns = &semaphore.Slots
numSlots := 0
for _, slot := range worker.Slots() {
if _, ok := slot.StepRunID(); !ok {
numSlots++
}
}
res.AvailableRuns = &numSlots

if worker.RelationsWorker.Actions != nil {
if actions := worker.Actions(); actions != nil {
Expand All @@ -69,21 +67,23 @@ func ToWorkerSqlc(worker *dbsqlc.Worker, stepCount *int64, slots *int) *gen.Work

dispatcherId := uuid.MustParse(pgUUIDToStr(worker.DispatcherId))

maxRuns := int(worker.MaxRuns.Int32)
maxRuns := int(worker.MaxRuns)

status := gen.ACTIVE

if worker.LastHeartbeatAt.Time.Add(5 * time.Second).Before(time.Now()) {
status = gen.INACTIVE
}

availableRuns := maxRuns - *slots

res := &gen.Worker{
Metadata: *toAPIMetadata(pgUUIDToStr(worker.ID), worker.CreatedAt.Time, worker.UpdatedAt.Time),
Name: worker.Name,
Status: &status,
DispatcherId: &dispatcherId,
MaxRuns: &maxRuns,
AvailableRuns: slots,
AvailableRuns: &availableRuns,
}

if !worker.LastHeartbeatAt.Time.IsZero() {
Expand Down
9 changes: 5 additions & 4 deletions internal/repository/prisma/dbsqlc/models.go
8000

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions internal/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -556,17 +556,20 @@ CREATE TABLE "Worker" (
"lastHeartbeatAt" TIMESTAMP(3),
"name" TEXT NOT NULL,
"dispatcherId" UUID,
"maxRuns" INTEGER,
"maxRuns" INTEGER NOT NULL DEFAULT 100,
"isActive" BOOLEAN NOT NULL DEFAULT false,
"lastListenerEstablished" TIMESTAMP(3),

CONSTRAINT "Worker_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "WorkerSemaphore" (
CREATE TABLE "WorkerSemaphoreSlot" (
"id" UUID NOT NULL,
"workerId" UUID N 8000 OT NULL,
"slots" INTEGER NOT NULL
"stepRunId" UUID,

CONSTRAINT "WorkerSemaphoreSlot_pkey" PRIMARY KEY ("id")
);

-- CreateTable
Expand Down Expand Up @@ -953,7 +956,13 @@ CREATE UNIQUE INDEX "UserSession_id_key" ON "UserSession"("id" ASC);
CREATE UNIQUE INDEX "Worker_id_key" ON "Worker"("id" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "WorkerSemaphore_workerId_key" ON "WorkerSemaphore"("workerId" ASC);
CREATE UNIQUE INDEX "WorkerSemaphoreSlot_id_key" ON "WorkerSemaphoreSlot"("id" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "WorkerSemaphoreSlot_stepRunId_key" ON "WorkerSemaphoreSlot"("stepRunId" ASC);

-- CreateIndex
CREATE INDEX "WorkerSemaphoreSlot_workerId_idx" ON "WorkerSemaphoreSlot"("workerId" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "Workflow_id_key" ON "Workflow"("id" ASC);
Expand Down Expand Up @@ -1235,7 +1244,10 @@ ALTER TABLE "Worker" ADD CONSTRAINT "Worker_dispatcherId_fkey" FOREIGN KEY ("dis
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "WorkerSemaphore" ADD CONSTRAINT "WorkerSemaphore_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE "WorkerSemaphoreSlot" ADD CONSTRAINT "WorkerSemaphoreSlot_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "WorkerSemaphoreSlot" ADD CONSTRAINT "WorkerSemaphoreSlot_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Workflow" ADD CONSTRAINT "Workflow_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
Expand Down
Loading
Loading
0