8000 fix: worker stream state by grutt · Pull Request #514 · hatchet-dev/hatchet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: worker stream state #514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ CREATE TABLE "Worker" (
"name" TEXT NOT NULL,
"dispatcherId" UUID,
"maxRuns" INTEGER,
"isActive" BOOLEAN NOT NULL DEFAULT false,

CONSTRAINT "Worker_pkey" PRIMARY KEY ("id")
);
Expand Down
11 changes: 9 additions & 2 deletions internal/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ WITH valid_workers AS (
WHERE
w."tenantId" = @tenantId::uuid
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND w."isActive" = true
GROUP BY
w."id"
),
Expand Down Expand Up @@ -344,11 +345,14 @@ step_runs AS (
AND w."lastHeartbeatAt" < NOW() - INTERVAL '30 seconds'
) OR (
sr."status" = 'ASSIGNED'
AND w."lastHeartbeatAt" < NOW() - INTERVAL '30 seconds'
-- reassign if the run is stuck in assigned
AND (
sr."updatedAt" < NOW() - INTERVAL '30 seconds'
OR w."lastHeartbeatAt" < NOW() - INTERVAL '30 seconds'
)
))
AND jr."status" = 'RUNNING'
AND sr."input" IS NOT NULL
-- Step run cannot have a failed parent
AND NOT EXISTS (
SELECT 1
FROM "_StepRunOrder" AS order_table
Expand Down Expand Up @@ -395,6 +399,7 @@ WITH valid_workers AS (
WHERE
w."tenantId" = @tenantId::uuid
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND w."isActive" = true
GROUP BY
w."id"
),
Expand Down Expand Up @@ -466,6 +471,7 @@ WITH valid_workers AS (
w."tenantId" = @tenantId::uuid
AND w."dispatcherId" IS NOT NULL
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND w."isActive" = true
AND w."id" IN (
SELECT "_ActionToWorker"."B"
FROM "_ActionToWorker"
Expand Down Expand Up @@ -500,6 +506,7 @@ WITH valid_workers AS (
w."tenantId" = @tenantId::uuid
AND w."dispatcherId" IS NOT NULL
AND w."lastHeartbeatAt" > NOW() - INTERVAL '5 seconds'
AND w."isActive" = true
AND w."id" IN (
SELECT "_ActionToWorker"."B"
FROM "_ActionToWorker"
Expand Down
11 changes: 9 additions & 2 deletions internal/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions internal/repository/prisma/dbsqlc/workers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ GROUP BY
SELECT
w."id" AS "id",
w."tenantId" AS "tenantId",
w."dispatcherId" AS "dispatcherId"
w."dispatcherId" AS "dispatcherId",
w."isActive" AS "isActive"
FROM
"Worker" w
WHERE
Expand Down Expand Up @@ -83,7 +84,8 @@ SET
"updatedAt" = CURRENT_TIMESTAMP,
"dispatcherId" = coalesce(sqlc.narg('dispatcherId')::uuid, "dispatcherId"),
"maxRuns" = coalesce(sqlc.narg('maxRuns')::int, "maxRuns"),
"lastHeartbeatAt" = coalesce(sqlc.narg('lastHeartbeatAt')::timestamp, "lastHeartbeatAt")
"lastHeartbeatAt" = coalesce(sqlc.narg('lastHeartbeatAt')::timestamp, "lastHeartbeatAt"),
"isActive" = coalesce(sqlc.narg('isActive')::boolean, "isActive")
WHERE
"id" = @id::uuid
RETURNING *;
Expand Down
30 changes: 22 additions & 8 deletions internal/repository/prisma/dbsqlc/workers.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions internal/repository/prisma/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ func (w *workerEngineRepository) UpdateWorker(ctx context.Context, tenantId, wor
updateParams.DispatcherId = sqlchelpers.UUIDFromStr(*opts.DispatcherId)
}

if opts.IsActive != nil {
updateParams.IsActive = pgtype.Bool{
Bool: *opts.IsActive,
Valid: true,
}
}

worker, err := w.queries.UpdateWorker(ctx, tx, updateParams)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/repository/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type UpdateWorkerOpts struct {
// When the last worker heartbeat was
LastHeartbeatAt *time.Time

// If the worker is active and accepting new runs
IsActive *bool

// A list of actions this worker can run
Actions []string `validate:"dive,actionId"`
}
Expand Down
47 changes: 46 additions & 1 deletion internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
}
}

// set the worker as active
isActive := true
_, err = s.repo.Worker().UpdateWorker(ctx, tenantId, request.WorkerId, &repository.UpdateWorkerOpts{
IsActive: &isActive,
})

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
return err
}

fin := make(chan bool)

s.workers.Add(request.WorkerId, sessionId, &subscribedWorker{stream: stream, finished: fin})
Expand All @@ -294,6 +305,8 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
default:
}

fmt.Println("deleting worker", request.WorkerId, sessionId)

s.workers.DeleteForSession(request.WorkerId, sessionId)
}()

Expand All @@ -302,9 +315,31 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream
select {
case <-fin:
s.l.Debug().Msgf("closing stream for worker id: %s", request.WorkerId)

isActive := false
_, err = s.repo.Worker().UpdateWorker(ctx, tenantId, request.WorkerId, &repository.UpdateWorkerOpts{
IsActive: &isActive,
})

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
return err
}

return nil
case <-ctx.Done():
s.l.Debug().Msgf("worker id %s has disconnected", request.WorkerId)

isActive := false
_, err = s.repo.Worker().UpdateWorker(context.Background(), tenantId, request.WorkerId, &repository.UpdateWorkerOpts{
IsActive: &isActive,
})

if err != nil {
s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId)
return err
}

return nil
}
}
Expand All @@ -326,7 +361,17 @@ func (s *DispatcherImpl) Heartbeat(ctx context.Context, req *contracts.Heartbeat
s.l.Warn().Msgf("heartbeat time is greater than expected heartbeat interval")
}

_, err := s.repo.Worker().UpdateWorker(ctx, tenantId, req.WorkerId, &repository.UpdateWorkerOpts{
worker, err := s.repo.Worker().GetWorkerForEngine(ctx, tenantId, req.WorkerId)

if err != nil {
return nil, err
}

if !worker.IsActive {
return nil, fmt.Errorf("Heartbeat rejected, worker stream is not active")
}

_, err = s.repo.Worker().UpdateWorker(ctx, tenantId, req.WorkerId, &repository.UpdateWorkerOpts{
// use the system time for heartbeat
LastHeartbeatAt: &heartbeatAt,
})
Expand Down
2 changes: 2 additions & 0 deletions prisma/migrations/20240520152239_v0_28_2/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Worker" ADD COLUMN "isActive" BOOLEAN NOT NULL DEFAULT false;
3 changes: 3 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,9 @@ model Worker {
// the last heartbeat time
lastHeartbeatAt DateTime?

// whether this worker is active or not
isActive Boolean @default(false)

// the worker name
name String

Expand Down
Loading
0