From 77bb42e9ab09bb9a311b6565918b81b6910e4b4b Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 10 Jan 2025 10:53:20 +0000 Subject: [PATCH] Adds WatchOrchestrationRuntimeStatus to backend Adds `WatchOrchestrationRuntimeStatus` to backend which streams Orchestration Runtime Status changes to the durabletask engine. This func is used when waiting for state changes to a workflow. This is much better than the existing implementation which spams "GetOrchestrationRuntimeState" which needlessly spins wheels and slows the execution of workflows by locking up both the backend implementation and db. Signed-off-by: joshvanl --- backend/backend.go | 6 ++++ backend/client.go | 52 +++++++++++++++---------------- backend/executor.go | 59 ++++++++++++++++-------------------- backend/sqlite/sqlite.go | 38 +++++++++++++++++++++++ go.mod | 28 +++++++++-------- go.sum | 61 ++++++++++++++++++++----------------- tests/mocks/Backend.go | 64 ++++++++++++++++++++++++++++++++++----- tests/mocks/Executor.go | 48 ++++++++++++++++++++++++++++- tests/mocks/TaskWorker.go | 2 +- 9 files changed, 249 insertions(+), 109 deletions(-) diff --git a/backend/backend.go b/backend/backend.go index 2b2fb0c..61692a9 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -26,6 +26,7 @@ type ( CreateWorkflowInstanceRequest = protos.CreateWorkflowInstanceRequest ActivityRequest = protos.ActivityRequest OrchestrationMetadata = protos.OrchestrationMetadata + OrchestrationStatus = protos.OrchestrationStatus WorkflowStateMetadata = protos.WorkflowStateMetadata DurableTimer = protos.DurableTimer ) @@ -74,6 +75,11 @@ type Backend interface { // GetOrchestrationRuntimeState gets the runtime state of an orchestration instance. GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error) + // WatchOrchestrationRuntimeStatus is a streaming API to watch for changes to + // the OrchestrtionMetadata, receiving events as and when the state changes. + // Used over polling the metadata. + WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *OrchestrationMetadata) error + // GetOrchestrationMetadata gets the metadata associated with the given orchestration instance ID. // // Returns [api.ErrInstanceNotFound] if the orchestration instance doesn't exist. diff --git a/backend/client.go b/backend/client.go index 024df72..684c0e3 100644 --- a/backend/client.go +++ b/backend/client.go @@ -2,10 +2,10 @@ package backend import ( "context" + "errors" "fmt" "time" - "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -15,6 +15,7 @@ import ( "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/kit/concurrency" ) type TaskHubClient interface { @@ -114,34 +115,31 @@ func (c *backendClient) WaitForOrchestrationCompletion(ctx context.Context, id a } func (c *backendClient) waitForOrchestrationCondition(ctx context.Context, id api.InstanceID, condition func(metadata *OrchestrationMetadata) bool) (*OrchestrationMetadata, error) { - b := backoff.ExponentialBackOff{ - InitialInterval: 100 * time.Millisecond, - MaxInterval: 10 * time.Second, - Multiplier: 1.5, - RandomizationFactor: 0.05, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - b.Reset() - - for { - t := time.NewTimer(b.NextBackOff()) - select { - case <-ctx.Done(): - if !t.Stop() { - <-t.C - } - return nil, ctx.Err() - case <-t.C: - metadata, err := c.FetchOrchestrationMetadata(ctx, id) - if err != nil { - return nil, err - } - if metadata != nil && condition(metadata) { - return metadata, nil + ch := make(chan *protos.OrchestrationMetadata) + var metadata *protos.OrchestrationMetadata + err := concurrency.NewRunnerManager( + func(ctx context.Context) error { + return c.be.WatchOrchestrationRuntimeStatus(ctx, id, ch) + }, + func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case metadata = <-ch: + if condition(metadata) { + return nil + } + } } - } + }, + ).Run(ctx) + + if err != nil || ctx.Err() != nil { + return nil, errors.Join(err, ctx.Err()) } + + return metadata, nil } // TerminateOrchestration enqueues a message to terminate a running orchestration, causing it to stop receiving new events and diff --git a/backend/executor.go b/backend/executor.go index 1a9522d..df2138d 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -22,6 +21,7 @@ import ( "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/kit/concurrency" ) var emptyCompleteTaskResponse = &protos.CompleteTaskResponse{} @@ -555,42 +555,35 @@ func (g *grpcExecutor) WaitForInstanceStart(ctx context.Context, req *protos.Get func (g *grpcExecutor) waitForInstance(ctx context.Context, req *protos.GetInstanceRequest, condition func(*OrchestrationMetadata) bool) (*protos.GetInstanceResponse, error) { iid := api.InstanceID(req.InstanceId) - var b backoff.BackOff = &backoff.ExponentialBackOff{ - InitialInterval: 1 * time.Millisecond, - MaxInterval: 3 * time.Second, - Multiplier: 1.5, - RandomizationFactor: 0.5, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - b = backoff.WithContext(b, ctx) - b.Reset() - -loop: - for { - t := time.NewTimer(b.NextBackOff()) - select { - case <-ctx.Done(): - if !t.Stop() { - <-t.C + ch := make(chan *protos.OrchestrationMetadata) + var metadata *protos.OrchestrationMetadata + err := concurrency.NewRunnerManager( + func(ctx context.Context) error { + return g.backend.WatchOrchestrationRuntimeStatus(ctx, iid, ch) + }, + func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case metadata = <-ch: + if condition(metadata) { + return nil + } + } } - break loop + }, + ).Run(ctx) - case <-t.C: - metadata, err := g.backend.GetOrchestrationMetadata(ctx, iid) - if err != nil { - return nil, err - } - if metadata == nil { - return &protos.GetInstanceResponse{Exists: false}, nil - } - if condition(metadata) { - return createGetInstanceResponse(req, metadata), nil - } - } + if err != nil || ctx.Err() != nil { + return nil, errors.Join(err, ctx.Err()) + } + + if metadata == nil { + return &protos.GetInstanceResponse{Exists: false}, nil } - return nil, status.Errorf(codes.Canceled, "instance hasn't completed") + return createGetInstanceResponse(req, metadata), nil } // mustEmbedUnimplementedTaskHubSidecarServiceServer implements protos.TaskHubSidecarServiceServer diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 1113152..962c5b6 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/helpers" "github.com/dapr/durabletask-go/api/protos" @@ -635,6 +636,43 @@ func (be *sqliteBackend) AddNewOrchestrationEvent(ctx context.Context, iid api.I return nil } +func (be *sqliteBackend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *backend.OrchestrationMetadata) error { + b := backoff.ExponentialBackOff{ + InitialInterval: 100 * time.Millisecond, + MaxInterval: 10 * time.Second, + Multiplier: 1.5, + RandomizationFactor: 0.05, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + for { + t := time.NewTimer(b.NextBackOff()) + + select { + case <-ctx.Done(): + if !t.Stop() { + <-t.C + } + return ctx.Err() + case <-t.C: + meta, err := be.GetOrchestrationMetadata(ctx, id) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- meta: + } + } + } + + return nil +} + // GetOrchestrationMetadata implements backend.Backend func (be *sqliteBackend) GetOrchestrationMetadata(ctx context.Context, iid api.InstanceID) (*backend.OrchestrationMetadata, error) { if err := be.ensureDB(); err != nil { diff --git a/go.mod b/go.mod index 24b39cd..9ae78bb 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,17 @@ module github.com/dapr/durabletask-go go 1.19 require ( - github.com/cenkalti/backoff/v4 v4.1.3 - github.com/golang/protobuf v1.5.3 - github.com/google/uuid v1.3.0 + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/dapr/kit v0.13.2 + github.com/google/uuid v1.3.1 github.com/marusama/semaphore/v2 v2.5.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 go.opentelemetry.io/otel v1.18.0 go.opentelemetry.io/otel/exporters/zipkin v1.11.1 go.opentelemetry.io/otel/sdk v1.11.1 go.opentelemetry.io/otel/trace v1.18.0 - google.golang.org/grpc v1.56.3 + google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.33.0 modernc.org/sqlite v1.22.1 ) @@ -24,22 +24,26 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/openzipkin/zipkin-go v0.4.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.18.0 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.6.0 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.15.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect modernc.org/ccgo/v3 v3.16.13 // indirect diff --git a/go.sum b/go.sum index 88b2464..9cb5eef 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ -github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= -github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/dapr/kit v0.13.2 h1:Ne+cWKn+LzEPZHvpsmHCAx5zieMqsC5WBJe8EuEpaBg= +github.com/dapr/kit v0.13.2/go.mod h1:YI7Pof1HAmMws7D3UCg0xoKwk1F8NmGNJqx5lhB+538= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -16,10 +18,10 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -39,14 +41,14 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 h1:KfYpVmrjI7JuToy5k8XV3nkapjWx48k4E4JOtVstzQI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0/go.mod h1:SeQhzAEccGVZVEy7aH87Nh0km+utSpo1pTv6eMMop48= go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= @@ -59,23 +61,26 @@ go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZp go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= -google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= @@ -86,6 +91,8 @@ gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= diff --git a/tests/mocks/Backend.go b/tests/mocks/Backend.go index 90cde7a..78ef150 100644 --- a/tests/mocks/Backend.go +++ b/tests/mocks/Backend.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks @@ -475,23 +475,23 @@ func (_c *Backend_GetActivityWorkItem_Call) RunAndReturn(run func(context.Contex } // GetOrchestrationMetadata provides a mock function with given fields: _a0, _a1 -func (_m *Backend) GetOrchestrationMetadata(_a0 context.Context, _a1 api.InstanceID) (*backend.OrchestrationMetadata, error) { +func (_m *Backend) GetOrchestrationMetadata(_a0 context.Context, _a1 api.InstanceID) (*protos.OrchestrationMetadata, error) { ret := _m.Called(_a0, _a1) if len(ret) == 0 { panic("no return value specified for GetOrchestrationMetadata") } - var r0 *backend.OrchestrationMetadata + var r0 *protos.OrchestrationMetadata var r1 error - if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) (*backend.OrchestrationMetadata, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) (*protos.OrchestrationMetadata, error)); ok { return rf(_a0, _a1) } - if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) *backend.OrchestrationMetadata); ok { + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID) *protos.OrchestrationMetadata); ok { r0 = rf(_a0, _a1) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*backend.OrchestrationMetadata) + r0 = ret.Get(0).(*protos.OrchestrationMetadata) } } @@ -523,12 +523,12 @@ func (_c *Backend_GetOrchestrationMetadata_Call) Run(run func(_a0 context.Contex return _c } -func (_c *Backend_GetOrchestrationMetadata_Call) Return(_a0 *backend.OrchestrationMetadata, _a1 error) *Backend_GetOrchestrationMetadata_Call { +func (_c *Backend_GetOrchestrationMetadata_Call) Return(_a0 *protos.OrchestrationMetadata, _a1 error) *Backend_GetOrchestrationMetadata_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *Backend_GetOrchestrationMetadata_Call) RunAndReturn(run func(context.Context, api.InstanceID) (*backend.OrchestrationMetadata, error)) *Backend_GetOrchestrationMetadata_Call { +func (_c *Backend_GetOrchestrationMetadata_Call) RunAndReturn(run func(context.Context, api.InstanceID) (*protos.OrchestrationMetadata, error)) *Backend_GetOrchestrationMetadata_Call { _c.Call.Return(run) return _c } @@ -789,6 +789,54 @@ func (_c *Backend_Stop_Call) RunAndReturn(run func(context.Context) error) *Back return _c } +// WatchOrchestrationRuntimeStatus provides a mock function with given fields: ctx, id, ch +func (_m *Backend) WatchOrchestrationRuntimeStatus(ctx context.Context, id api.InstanceID, ch chan<- *protos.OrchestrationMetadata) error { + ret := _m.Called(ctx, id, ch) + + if len(ret) == 0 { + panic("no return value specified for WatchOrchestrationRuntimeStatus") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, api.InstanceID, chan<- *protos.OrchestrationMetadata) error); ok { + r0 = rf(ctx, id, ch) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Backend_WatchOrchestrationRuntimeStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchOrchestrationRuntimeStatus' +type Backend_WatchOrchestrationRuntimeStatus_Call struct { + *mock.Call +} + +// WatchOrchestrationRuntimeStatus is a helper method to define mock.On call +// - ctx context.Context +// - id api.InstanceID +// - ch chan<- *protos.OrchestrationMetadata +func (_e *Backend_Expecter) WatchOrchestrationRuntimeStatus(ctx interface{}, id interface{}, ch interface{}) *Backend_WatchOrchestrationRuntimeStatus_Call { + return &Backend_WatchOrchestrationRuntimeStatus_Call{Call: _e.mock.On("WatchOrchestrationRuntimeStatus", ctx, id, ch)} +} + +func (_c *Backend_WatchOrchestrationRuntimeStatus_Call) Run(run func(ctx context.Context, id api.InstanceID, ch chan<- *protos.OrchestrationMetadata)) *Backend_WatchOrchestrationRuntimeStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(api.InstanceID), args[2].(chan<- *protos.OrchestrationMetadata)) + }) + return _c +} + +func (_c *Backend_WatchOrchestrationRuntimeStatus_Call) Return(_a0 error) *Backend_WatchOrchestrationRuntimeStatus_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Backend_WatchOrchestrationRuntimeStatus_Call) RunAndReturn(run func(context.Context, api.InstanceID, chan<- *protos.OrchestrationMetadata) error) *Backend_WatchOrchestrationRuntimeStatus_Call { + _c.Call.Return(run) + return _c +} + // NewBackend creates a new instance of Backend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewBackend(t interface { diff --git a/tests/mocks/Executor.go b/tests/mocks/Executor.go index a2fa9b0..0cfe3c1 100644 --- a/tests/mocks/Executor.go +++ b/tests/mocks/Executor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks @@ -147,6 +147,52 @@ func (_c *Executor_ExecuteOrchestrator_Call) RunAndReturn(run func(context.Conte return _c } +// Shutdown provides a mock function with given fields: ctx +func (_m *Executor) Shutdown(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Shutdown") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Executor_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown' +type Executor_Shutdown_Call struct { + *mock.Call +} + +// Shutdown is a helper method to define mock.On call +// - ctx context.Context +func (_e *Executor_Expecter) Shutdown(ctx interface{}) *Executor_Shutdown_Call { + return &Executor_Shutdown_Call{Call: _e.mock.On("Shutdown", ctx)} +} + +func (_c *Executor_Shutdown_Call) Run(run func(ctx context.Context)) *Executor_Shutdown_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Executor_Shutdown_Call) Return(_a0 error) *Executor_Shutdown_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Executor_Shutdown_Call) RunAndReturn(run func(context.Context) error) *Executor_Shutdown_Call { + _c.Call.Return(run) + return _c +} + // NewExecutor creates a new instance of Executor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewExecutor(t interface { diff --git a/tests/mocks/TaskWorker.go b/tests/mocks/TaskWorker.go index 7ee0f2c..4fb7bea 100644 --- a/tests/mocks/TaskWorker.go +++ b/tests/mocks/TaskWorker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.45.1. DO NOT EDIT. package mocks