8000 new api-contract for workflow run events by abelanger5 · Pull Request #394 · hatchet-dev/hatchet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

new api-contract for workflow run events #394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ service Dispatcher {

rpc SubscribeToWorkflowEvents(SubscribeToWorkflowEventsRequest) returns (stream WorkflowEvent) {}

rpc SubscribeToWorkflowRuns(stream SubscribeToWorkflowRunsRequest) returns (stream WorkflowRunEvent) {}

rpc SendStepActionEvent(StepActionEvent) returns (ActionEventResponse) {}

rpc SendGroupKeyActionEvent(GroupKeyActionEvent) returns (ActionEventResponse) {}
Expand Down Expand Up @@ -190,6 +192,11 @@ message SubscribeToWorkflowEventsRequest {
string workflowRunId = 1;
}

message SubscribeToWorkflowRunsRequest {
// the id of the workflow run
string workflowRunId = 1;
}

enum ResourceType {
RESOURCE_TYPE_UNKNOWN = 0;
RESOURCE_TYPE_STEP_RUN = 1;
Expand Down Expand Up @@ -232,6 +239,33 @@ message WorkflowEvent {
optional int32 retryCount = 9;
}

enum WorkflowRunEventType {
WORKFLOW_RUN_EVENT_TYPE_FINISHED = 0;
}

message WorkflowRunEvent {
// the id of the workflow run
string workflowRunId = 1;

WorkflowRunEventType eventType = 2;

google.protobuf.Timestamp eventTimestamp = 3;

repeated StepRunResult results = 4;
}

message StepRunResult {
string stepRunId = 1;

string stepReadableId = 2;

string jobRunId = 3;

optional string error = 4;

optional string output = 5;
}

message OverridesData {
// the step run id
string stepRunId = 1;
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/authn/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (a *AuthN) handleBearerAuth(c echo.Context) error {
}

// Validate the token.
tenantId, err := a.config.Auth.JWTManager.ValidateTenantToken(token)
tenantId, err := a.config.Auth.JWTManager.ValidateTenantToken(c.Request().Context(), token)

if err != nil {
a.l.Debug().Err(err).Msg("error validating tenant token")
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/api-tokens/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (a *APITokenService) ApiTokenCreate(ctx echo.Context, request gen.ApiTokenC
return gen.ApiTokenCreate400JSONResponse(*apiErrors), nil
}

token, err := a.config.Auth.JWTManager.GenerateTenantToken(tenant.ID, request.Body.Name)
token, err := a.config.Auth.JWTManager.GenerateTenantToken(ctx.Request().Context(), tenant.ID, request.Body.Name)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/events/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (t *EventService) EventUpdateReplay(ctx echo.Context, request gen.EventUpda
eventIds[i] = request.Body.EventIds[i].String()
}

events, err := t.config.EngineRepository.Event().ListEventsByIds(tenant.ID, eventIds)
events, err := t.config.EngineRepository.Event().ListEventsByIds(ctx.Request().Context(), tenant.ID, eventIds)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/step-runs/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (t *StepRunService) StepRunUpdateCancel(ctx echo.Context, request gen.StepR
), nil
}

engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(tenant.ID, stepRun.ID)
engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(ctx.Request().Context(), tenant.ID, stepRun.ID)

if err != nil {
return nil, fmt.Errorf("could not get step run for engine: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/step-runs/rerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
return nil, err
}

engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(tenant.ID, stepRun.ID)
engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(ctx.Request().Context(), tenant.ID, stepRun.ID)

if err != nil {
return nil, fmt.Errorf("could not get step run for engine: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion api/v1/server/handlers/workflows/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (t *WorkflowService) WorkflowRunCreate(ctx echo.Context, request gen.Workfl
workflowVersionId = versions[0].ID
}

workflowVersion, err := t.config.EngineRepository.Workflow().GetWorkflowVersionById(tenant.ID, workflowVersionId)
workflowVersion, err := t.config.EngineRepository.Workflow().GetWorkflowVersionById(ctx.Request().Context(), tenant.ID, workflowVersionId)

if err != nil {
if errors.Is(err, db.ErrNotFound) {
Expand Down
5 changes: 3 additions & 2 deletions cmd/hatchet-admin/cli/seed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cli

import (
"context"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -123,14 +124,14 @@ func runSeed(cf *loader.ConfigLoader) error {
}

func seedDev(repo repository.EngineRepository, tenantId string) error {
_, err := repo.Workflow().GetWorkflowByName(tenantId, "test-workflow")
_, err := repo.Workflow().GetWorkflowByName(context.Background(), tenantId, "test-workflow")

if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return err
}

wf, err := repo.Workflow().CreateNewWorkflow(tenantId, &repository.CreateWorkflowVersionOpts{
wf, err := repo.Workflow().CreateNewWorkflow(context.Background(), tenantId, &repository.CreateWorkflowVersionOpts{
Name: "test-workflow",
Description: repository.StringPtr("This is a test workflow."),
Version: repository.StringPtr("v0.1.0"),
Expand Down
3 changes: 2 additions & 1 deletion cmd/hatchet-admin/cli/token.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cli

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -68,7 +69,7 @@ func runCreateAPIToken() error {

defer serverConf.Disconnect() // nolint:errcheck

defaultTok, err := serverConf.Auth.JWTManager.GenerateTenantToken(tokenTenantId, tokenName)
defaultTok, err := serverConf.Auth.JWTManager.GenerateTenantToken(context.Background(), tokenTenantId, tokenName)

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion examples/manual-trigger/trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func run(ch <-chan interface{}, events chan<- string) error {
interruptCtx, cancel := cmdutils.InterruptContextFromChan(ch)
defer cancel()

err = c.Subscribe().On(interruptCtx, workflowRunId, func(event client.RunEvent) error {
err = c.Subscribe().On(interruptCtx, workflowRunId, func(event client.WorkflowEvent) error {
fmt.Println(event.EventPayload)

return nil
Expand Down
44 changes: 38 additions & 6 deletions examples/procedural/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"sync"
"time"

"github.com/joho/godotenv"
Expand Down Expand Up @@ -95,7 +96,8 @@ func run(events chan<- string) (func() error, error) {

eg.SetLimit(NUM_CHILDREN)

childOutputs := make([]int, NUM_CHILDREN)
childOutputs := make([]int, 0)
childOutputsMu := sync.Mutex{}

for i, childWorkflow := range childWorkflows {
eg.Go(func(i int, childWorkflow *worker.ChildWorkflow) func() error {
Expand All @@ -114,7 +116,9 @@ func run(events chan<- string) (func() error, error) {
return err
}

childOutputs[i] = childOutput.Index
childOutputsMu.Lock()
childOutputs = append(childOutputs, childOutput.Index)
childOutputsMu.Unlock()

events <- fmt.Sprintf("child-%d-completed", childOutput.Index)

Expand All @@ -124,10 +128,38 @@ func run(events chan<- string) (func() error, error) {
}(i, childWorkflow))
}

err = eg.Wait()
finishedCh := make(chan struct{})

if err != nil {
return nil, err
go func() {
defer close(finishedCh)
err = eg.Wait()
}()

timer := time.NewTimer(60 * time.Second)

select {
case <-finishedCh:
if err != nil {
return nil, err
}
case <-timer.C:
incomplete := make([]int, 0)
// print non-complete children
for i := range childWorkflows {
completed := false
for _, childOutput := range childOutputs {
if childOutput == i {
completed = true
break
}
}

if !completed {
incomplete = append(incomplete, i)
}
}

return nil, fmt.Errorf("timed out waiting for the following child workflows to complete: %v", incomplete)
}

sum := 0
Expand All @@ -140,7 +172,7 @@ func run(events chan<- string) (func() error, error) {
ChildSum: sum,
}, nil
},
),
).SetTimeout("10m"),
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion frontend/docs/pages/blog/multi-tenant-queues.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Which gives us much more promising output:
This works — and you can modify this logic to be more distributed by maintaining a lease when a worker starts for a set amount of time — as long as the polling interval is below the query duration time (or more specifically, `pollingTime / numWorkers` is below the query duration time). But what happens when our queue starts to fill up? Let's add 10,000 enqueued tasks and run an `EXPLAIN ANALYZE` for this query to take a look at performance:

```sql
QUERY PLAN
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------
Update on tasks (cost=259.44..514.23 rows=1 width=78) (actual time=132.717..154.337 rows=100 loops=1)
-> Hash Join (cost=259.44..514.23 rows=1 width=78) (actual time=125.423..141.271 rows=100 loops=1)
Expand Down
13 changes: 7 additions & 6 deletions internal/auth/token/token.go
17AE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package token

import (
"context"
"fmt"
"time"

Expand All @@ -12,8 +13,8 @@ import (
)

type JWTManager interface {
GenerateTenantToken(tenantId, name string) (string, error)
ValidateTenantToken(token string) (string, error)
GenerateTenantToken(ctx context.Context, tenantId, name string) (string, error)
ValidateTenantToken(ctx context.Context, token string) (string, error)
}

type TokenOpts struct {
Expand Down Expand Up @@ -45,7 +46,7 @@ func NewJWTManager(encryptionSvc encryption.EncryptionService, tokenRepo reposit
}, nil
}

func (j *jwtManagerImpl) GenerateTenantToken(tenantId, name string) (string, error) {
func (j *jwtManagerImpl) GenerateTenantToken(ctx context.Context, tenantId, name string) (string, error) {
// Retrieve the JWT Signer primitive from privateKeysetHandle.
signer, err := jwt.NewSigner(j.encryption.GetPrivateJWTHandle())

Expand All @@ -68,7 +69,7 @@ func (j *jwtManagerImpl) GenerateTenantToken(tenantId, name string) (string, err
}

// write the token to the database
_, err = j.tokenRepo.CreateAPIToken(&repository.CreateAPITokenOpts{
_, err = j.tokenRepo.CreateAPIToken(ctx, &repository.CreateAPITokenOpts{
ID: tokenId,
ExpiresAt: expiresAt,
TenantId: &tenantId,
Expand All @@ -82,7 +83,7 @@ func (j *jwtManagerImpl) GenerateTenantToken(tenantId, name string) (string, err
return token, nil
}

func (j *jwtManagerImpl) ValidateTenantToken( F41A token string) (tenantId string, err error) {
func (j *jwtManagerImpl) ValidateTenantToken(ctx context.Context, token string) (tenantId string, err error) {
// Verify the signed token.
audience := j.opts.Audience

Expand Down Expand Up @@ -140,7 +141,7 @@ func (j *jwtManagerImpl) ValidateTenantToken(token string) (tenantId string, err
}

// read the token from the database
dbToken, err := j.tokenRepo.GetAPITokenById(tokenId)
dbToken, err := j.tokenRepo.GetAPITokenById(ctx, tokenId)

if err != nil {
return "", fmt.Errorf("failed to read token from database: %v", err)
Expand Down
17 changes: 9 additions & 8 deletions internal/auth/token/token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package token_test

import (
"context"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -40,14 +41,14 @@ func TestCreateTenantToken(t *testing.T) { // make sure no cache is used for tes
t.Fatal(err.Error())
}

token, err := jwtManager.GenerateTenantToken(tenantId, "test token")
token, err := jwtManager.GenerateTenantToken(context.Background(), tenantId, "test token")

if err != nil {
t.Fatal(err.Error())
}

// validate the token
newTenantId, err := jwtManager.ValidateTenantToken(token)
newTenantId, err := jwtManager.ValidateTenantToken(context.Background(), token)

assert.NoError(t, err)
assert.Equal(t, tenantId, newTenantId)
Expand Down Expand Up @@ -81,14 +82,14 @@ func TestRevokeTenantToken(t *testing.T) {
t.Fatal(err.Error())
}

token, err := jwtManager.GenerateTenantToken(tenantId, "test token")
token, err := jwtManager.GenerateTenantToken(context.Background(), tenantId, "test token")

if err != nil {
t.Fatal(err.Error())
}

// validate the token
_, err = jwtManager.ValidateTenantToken(token)
_, err = jwtManager.ValidateTenantToken(context.Background(), token)

assert.NoError(t, err)

Expand All @@ -107,7 +108,7 @@ func TestRevokeTenantToken(t *testing.T) {
}

// validate the token again
_, err = jwtManager.ValidateTenantToken(token)
_, err = jwtManager.ValidateTenantToken(context.Background(), token)

// error as the token was revoked
assert.Error(t, err)
Expand Down Expand Up @@ -141,14 +142,14 @@ func TestRevokeTenantTokenCache(t *testing.T) {
t.Fatal(err.Error())
}

token, err := jwtManager.GenerateTenantToken(tenantId, "test token")
token, err := jwtManager.GenerateTenantToken(context.Background(), tenantId, "test token")

if err != nil {
t.Fatal(err.Error())
}

// validate the token
_, err = jwtManager.ValidateTenantToken(token)
_, err = jwtManager.ValidateTenantToken(context.Background(), token)

assert.NoError(t, err)

Expand All @@ -167,7 +168,7 @@ func TestRevokeTenantTokenCache(t *testing.T) {
}

// validate the token again
_, err = jwtManager.ValidateTenantToken(token)
_, err = jwtManager.ValidateTenantToken(context.Background(), token)

// no error as it is cached
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
}

// generate a token for the internal client
token, err := auth.JWTManager.GenerateTenantToken(internalTenant.ID, fmt.Sprintf("internal-%s", tokenSuffix))
token, err := auth.JWTManager.GenerateTenantToken(context.Background(), internalTenant.ID, fmt.Sprintf("internal-%s", tokenSuffix))

if err != nil {
return nil, nil, fmt.Errorf("could not generate internal token: %w", err)
Expand Down
Loading
0