From 4ce3eaf5e9558b86041a26ad6e926de10df62c64 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Fri, 25 Apr 2025 11:11:49 +0530 Subject: [PATCH 01/23] Adding circuit breaker middleware --- .../circuitbreakererror/error.go | 27 ++++ .../circuitbreakerfx/middleware/config.go | 12 ++ .../circuitbreakerfx/middleware/metrics.go | 25 +++ .../circuitbreakerfx/middleware/middleware.go | 153 ++++++++++++++++++ 4 files changed, 217 insertions(+) create mode 100644 src/dbnode/circuitbreakerfx/circuitbreakererror/error.go create mode 100644 src/dbnode/circuitbreakerfx/middleware/config.go create mode 100644 src/dbnode/circuitbreakerfx/middleware/metrics.go create mode 100644 src/dbnode/circuitbreakerfx/middleware/middleware.go diff --git a/src/dbnode/circuitbreakerfx/circuitbreakererror/error.go b/src/dbnode/circuitbreakerfx/circuitbreakererror/error.go new file mode 100644 index 0000000000..816cde61bb --- /dev/null +++ b/src/dbnode/circuitbreakerfx/circuitbreakererror/error.go @@ -0,0 +1,27 @@ +package circuitbreakererror + +import ( + xerrors "github.com/m3db/m3/src/x/errors" + "strings" +) + +// circuitBreakerError is an error type that indicates a circuit breaker error. +type circuitBreakerError struct { + host string +} + +var _ error = (*circuitBreakerError)(nil) + +// New creates a new circuit breaker error with the given host. +func New(host string) error { + err := circuitBreakerError{host: host} + return xerrors.NewNonRetryableError(err) +} + +// Error returns the error message for the circuit breaker error. +func (e circuitBreakerError) Error() string { + var b strings.Builder + b.WriteString("request rejected by circuit breaker of outbound-service: ") + b.WriteString(e.host) + return b.String() +} diff --git a/src/dbnode/circuitbreakerfx/middleware/config.go b/src/dbnode/circuitbreakerfx/middleware/config.go new file mode 100644 index 0000000000..54d881364c --- /dev/null +++ b/src/dbnode/circuitbreakerfx/middleware/config.go @@ -0,0 +1,12 @@ +package middleware + +import ( + "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreaker" +) + +// Config represents the configuration for the circuit breaker middleware. +type Config struct { + Enabled bool `yaml:"enabled"` + ShadowMode bool `yaml:"ShadowMode"` + CircuitBreakerConfig circuitbreaker.Config `yaml:"circuitBreakerConfig"` +} diff --git a/src/dbnode/circuitbreakerfx/middleware/metrics.go b/src/dbnode/circuitbreakerfx/middleware/metrics.go new file mode 100644 index 0000000000..12b95434a3 --- /dev/null +++ b/src/dbnode/circuitbreakerfx/middleware/metrics.go @@ -0,0 +1,25 @@ +package middleware + +import ( + "github.com/uber-go/tally" +) + +type circuitBreakerMetrics struct { + successes tally.Counter // counter for successful requests + failures tally.Counter // counter for failed requests + rejects tally.Counter // counter for rejected requests +} + +func newMetrics(scope tally.Scope, host string) *circuitBreakerMetrics { + taggedScope := scope.Tagged(map[string]string{ + "component": _packageName, + "host": host, + }) + + metrics := &circuitBreakerMetrics{ + successes: taggedScope.Counter("circuit_breaker_successes"), + failures: taggedScope.Counter("circuit_breaker_failures"), + rejects: taggedScope.Counter("circuit_breaker_rejects"), + } + return metrics +} diff --git a/src/dbnode/circuitbreakerfx/middleware/middleware.go b/src/dbnode/circuitbreakerfx/middleware/middleware.go new file mode 100644 index 0000000000..32b9e87af0 --- /dev/null +++ b/src/dbnode/circuitbreakerfx/middleware/middleware.go @@ -0,0 +1,153 @@ +package middleware + +import ( + "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreaker" + "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreakererror" + "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/uber-go/tally" + tchannel "github.com/uber/tchannel-go" + "go.uber.org/zap" + "sync" + "sync/atomic" +) + +const ( + _packageName = "circuit_breaker" +) + +// circuitBreakerClient is a client that wraps a TChannel client with a circuit breaker. +type circuitBreakerClient struct { + enabled bool + shadowMode bool + logger *zap.Logger + cb *atomic.Value // *circuitbreaker.Circuit + metrics *circuitBreakerMetrics + host string + next rpc.TChanNode +} + +// M3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client. +type M3DBMiddleware func(rpc.TChanNode) *circuitBreakerClient + +var ( + cbInitOnce sync.Map // map[string]*sync.Once + cbMap sync.Map // map[string]*atomic.Value + metricsMap sync.Map // map[string]*circuitBreakerMetrics +) + +// NewCircuitBreakerMiddleware creates a new circuit breaker middleware. +func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) M3DBMiddleware { + initializeCircuitBreaker(config, logger, scope, host) + + return func(next rpc.TChanNode) *circuitBreakerClient { + return createCircuitBreakerClient(config, logger, host, next) + } +} + +// initializeCircuitBreaker initializes the circuit breaker for the given host. +func initializeCircuitBreaker(config Config, logger *zap.Logger, scope tally.Scope, host string) { + onceIface, _ := cbInitOnce.LoadOrStore(host, new(sync.Once)) + once := onceIface.(*sync.Once) + + once.Do(func() { + logger.Info("creating circuit breaker middleware", zap.String("host", host)) + metrics := newMetrics(scope, host) + metricsMap.Store(host, metrics) + + cb, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) + if err != nil { + logger.Warn("failed to create circuit breaker", zap.Error(err)) + return + } + + cbVal := &atomic.Value{} + cbVal.Store(cb) + cbMap.Store(host, cbVal) + }) +} + +// createCircuitBreakerClient creates a new circuit breaker client. +func createCircuitBreakerClient(config Config, logger *zap.Logger, host string, next rpc.TChanNode) *circuitBreakerClient { + cbIface, _ := cbMap.Load(host) + metricsIface, _ := metricsMap.Load(host) + + return &circuitBreakerClient{ + enabled: config.Enabled, + shadowMode: config.ShadowMode, + next: next, + logger: logger, + host: host, + metrics: metricsIface.(*circuitBreakerMetrics), + cb: cbIface.(*atomic.Value), + } +} + +// withBreaker executes the given call with a circuit breaker if enabled. +func withBreaker[T any](c *circuitBreakerClient, ctx tchannel.ContextWithHeaders, call func() error) error { + if !c.enabled { + return c.executeWithoutBreaker(call) + } + + cb := c.getCircuit() + if cb == nil || !cb.IsRequestAllowed() { + return c.handleRejectedRequest() + } + + return c.executeWithBreaker(cb, call) +} + +// executeWithoutBreaker executes the given call without a circuit breaker. +func (c *circuitBreakerClient) executeWithoutBreaker(call func() error) error { + c.logger.Info("circuit breaker disabled, calling next", zap.String("host", c.host)) + return call() +} + +// getCircuit retrieves the circuit breaker from the atomic value. +func (c *circuitBreakerClient) getCircuit() *circuitbreaker.Circuit { + cb, _ := c.cb.Load().(*circuitbreaker.Circuit) + return cb +} + +// handleRejectedRequest handles a rejected request by the circuit breaker. +func (c *circuitBreakerClient) handleRejectedRequest() error { + c.metrics.rejects.Inc(1) + c.logger.Info("circuit breaker request rejected", zap.String("host", c.host)) + if !c.shadowMode { + return circuitbreakererror.New(c.host) + } + return nil +} + +// executeWithBreaker executes the given call with a circuit breaker and handles success or failure. +func (c *circuitBreakerClient) executeWithBreaker(cb *circuitbreaker.Circuit, call func() error) error { + err := call() + if err == nil { + c.handleSuccess(cb) + } else { + c.handleFailure(cb) + } + c.logger.Info("circuit breaker call done", zap.String("host", c.host)) + return err +} + +// handleSuccess handles a successful request by the circuit breaker. +func (c *circuitBreakerClient) handleSuccess(cb *circuitbreaker.Circuit) { + cb.ReportRequestStatus(true) + c.logger.Info("circuit breaker call success", zap.String("host", c.host)) + c.metrics.successes.Inc(1) +} + +// handleFailure handles a failed request by the circuit breaker. +func (c *circuitBreakerClient) handleFailure(cb *circuitbreaker.Circuit) { + cb.ReportRequestStatus(false) + c.logger.Info("circuit breaker call failed", zap.String("host", c.host)) + c.metrics.failures.Inc(1) +} + +// WriteBatchRaw is a method that writes a batch of raw data. +func (c *circuitBreakerClient) WriteBatchRaw(ctx tchannel.ContextWithHeaders, req *rpc.WriteBatchRawRequest) error { + return withBreaker[*rpc.WriteBatchRawRequest](c, ctx, func() error { + return c.next.WriteBatchRaw(ctx, req) + }) +} From cca71b73f0aa7166db5161c05555d8b454f52f96 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Tue, 29 Apr 2025 18:22:50 +0530 Subject: [PATCH 02/23] review changes --- .../{ => circuitbreaker}/circuit.go | 0 .../{ => circuitbreaker}/circuit_test.go | 0 .../{ => circuitbreaker}/clock.go | 0 .../{ => circuitbreaker}/config.go | 0 .../{ => circuitbreaker}/config_test.go | 0 .../{ => circuitbreaker}/counters.go | 0 .../{ => circuitbreaker}/counters_test.go | 0 .../{ => circuitbreaker}/doc.go | 0 .../{ => circuitbreaker}/state.go | 0 .../{ => circuitbreaker}/state_test.go | 0 .../{ => circuitbreaker}/status.go | 0 .../{ => circuitbreaker}/status_test.go | 0 .../{ => circuitbreaker}/window.go | 0 .../{ => circuitbreaker}/window_test.go | 0 .../circuitbreakererror/error.go | 0 .../circuitbreaker}/middleware/config.go | 2 +- .../circuitbreaker}/middleware/metrics.go | 0 .../circuitbreaker}/middleware/middleware.go | 37 +++++++++++-------- 18 files changed, 23 insertions(+), 16 deletions(-) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/circuit.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/circuit_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/clock.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/config.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/config_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/counters.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/counters_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/doc.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/state.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/state_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/status.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/status_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/window.go (100%) rename src/dbnode/client/circuitbreaker/{ => circuitbreaker}/window_test.go (100%) rename src/dbnode/{circuitbreakerfx => client/circuitbreaker}/circuitbreakererror/error.go (100%) rename src/dbnode/{circuitbreakerfx => client/circuitbreaker}/middleware/config.go (82%) rename src/dbnode/{circuitbreakerfx => client/circuitbreaker}/middleware/metrics.go (100%) rename src/dbnode/{circuitbreakerfx => client/circuitbreaker}/middleware/middleware.go (80%) diff --git a/src/dbnode/client/circuitbreaker/circuit.go b/src/dbnode/client/circuitbreaker/circuitbreaker/circuit.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuit.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/circuit.go diff --git a/src/dbnode/client/circuitbreaker/circuit_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/circuit_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuit_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/circuit_test.go diff --git a/src/dbnode/client/circuitbreaker/clock.go b/src/dbnode/client/circuitbreaker/circuitbreaker/clock.go similarity index 100% rename from src/dbnode/client/circuitbreaker/clock.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/clock.go diff --git a/src/dbnode/client/circuitbreaker/config.go b/src/dbnode/client/circuitbreaker/circuitbreaker/config.go similarity index 100% rename from src/dbnode/client/circuitbreaker/config.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/config.go diff --git a/src/dbnode/client/circuitbreaker/config_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/config_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/config_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/config_test.go diff --git a/src/dbnode/client/circuitbreaker/counters.go b/src/dbnode/client/circuitbreaker/circuitbreaker/counters.go similarity index 100% rename from src/dbnode/client/circuitbreaker/counters.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/counters.go diff --git a/src/dbnode/client/circuitbreaker/counters_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/counters_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/counters_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/counters_test.go diff --git a/src/dbnode/client/circuitbreaker/doc.go b/src/dbnode/client/circuitbreaker/circuitbreaker/doc.go similarity index 100% rename from src/dbnode/client/circuitbreaker/doc.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/doc.go diff --git a/src/dbnode/client/circuitbreaker/state.go b/src/dbnode/client/circuitbreaker/circuitbreaker/state.go similarity index 100% rename from src/dbnode/client/circuitbreaker/state.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/state.go diff --git a/src/dbnode/client/circuitbreaker/state_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/state_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/state_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/state_test.go diff --git a/src/dbnode/client/circuitbreaker/status.go b/src/dbnode/client/circuitbreaker/circuitbreaker/status.go similarity index 100% rename from src/dbnode/client/circuitbreaker/status.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/status.go diff --git a/src/dbnode/client/circuitbreaker/status_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/status_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/status_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/status_test.go diff --git a/src/dbnode/client/circuitbreaker/window.go b/src/dbnode/client/circuitbreaker/circuitbreaker/window.go similarity index 100% rename from src/dbnode/client/circuitbreaker/window.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/window.go diff --git a/src/dbnode/client/circuitbreaker/window_test.go b/src/dbnode/client/circuitbreaker/circuitbreaker/window_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/window_test.go rename to src/dbnode/client/circuitbreaker/circuitbreaker/window_test.go diff --git a/src/dbnode/circuitbreakerfx/circuitbreakererror/error.go b/src/dbnode/client/circuitbreaker/circuitbreakererror/error.go similarity index 100% rename from src/dbnode/circuitbreakerfx/circuitbreakererror/error.go rename to src/dbnode/client/circuitbreaker/circuitbreakererror/error.go diff --git a/src/dbnode/circuitbreakerfx/middleware/config.go b/src/dbnode/client/circuitbreaker/middleware/config.go similarity index 82% rename from src/dbnode/circuitbreakerfx/middleware/config.go rename to src/dbnode/client/circuitbreaker/middleware/config.go index 54d881364c..878c0e4480 100644 --- a/src/dbnode/circuitbreakerfx/middleware/config.go +++ b/src/dbnode/client/circuitbreaker/middleware/config.go @@ -1,7 +1,7 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreaker" ) // Config represents the configuration for the circuit breaker middleware. diff --git a/src/dbnode/circuitbreakerfx/middleware/metrics.go b/src/dbnode/client/circuitbreaker/middleware/metrics.go similarity index 100% rename from src/dbnode/circuitbreakerfx/middleware/metrics.go rename to src/dbnode/client/circuitbreaker/middleware/metrics.go diff --git a/src/dbnode/circuitbreakerfx/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go similarity index 80% rename from src/dbnode/circuitbreakerfx/middleware/middleware.go rename to src/dbnode/client/circuitbreaker/middleware/middleware.go index 32b9e87af0..48c8007482 100644 --- a/src/dbnode/circuitbreakerfx/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -1,8 +1,8 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreaker" - "github.com/m3db/m3/src/dbnode/circuitbreakerfx/circuitbreakererror" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" xerrors "github.com/m3db/m3/src/x/errors" "github.com/uber-go/tally" @@ -27,33 +27,40 @@ type circuitBreakerClient struct { next rpc.TChanNode } -// M3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client. -type M3DBMiddleware func(rpc.TChanNode) *circuitBreakerClient +// M3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. +type M3DBMiddleware func(rpc.TChanNode) CircuitBreakerClient -var ( +// CircuitBreakerClient defines the interface for a circuit breaker client. +type CircuitBreakerClient interface { + WriteBatchRaw(ctx tchannel.ContextWithHeaders, req *rpc.WriteBatchRawRequest) error +} + +// circuitBreakerState holds the shared state for circuit breakers +type circuitBreakerState struct { cbInitOnce sync.Map // map[string]*sync.Once cbMap sync.Map // map[string]*atomic.Value metricsMap sync.Map // map[string]*circuitBreakerMetrics -) +} // NewCircuitBreakerMiddleware creates a new circuit breaker middleware. func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) M3DBMiddleware { - initializeCircuitBreaker(config, logger, scope, host) + state := &circuitBreakerState{} + initializeCircuitBreaker(config, logger, scope, host, state) return func(next rpc.TChanNode) *circuitBreakerClient { - return createCircuitBreakerClient(config, logger, host, next) + return createCircuitBreakerClient(config, logger, host, next, state) } } // initializeCircuitBreaker initializes the circuit breaker for the given host. -func initializeCircuitBreaker(config Config, logger *zap.Logger, scope tally.Scope, host string) { - onceIface, _ := cbInitOnce.LoadOrStore(host, new(sync.Once)) +func initializeCircuitBreaker(config Config, logger *zap.Logger, scope tally.Scope, host string, state *circuitBreakerState) { + onceIface, _ := state.cbInitOnce.LoadOrStore(host, new(sync.Once)) once := onceIface.(*sync.Once) once.Do(func() { logger.Info("creating circuit breaker middleware", zap.String("host", host)) metrics := newMetrics(scope, host) - metricsMap.Store(host, metrics) + state.metricsMap.Store(host, metrics) cb, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) if err != nil { @@ -63,14 +70,14 @@ func initializeCircuitBreaker(config Config, logger *zap.Logger, scope tally.Sco cbVal := &atomic.Value{} cbVal.Store(cb) - cbMap.Store(host, cbVal) + state.cbMap.Store(host, cbVal) }) } // createCircuitBreakerClient creates a new circuit breaker client. -func createCircuitBreakerClient(config Config, logger *zap.Logger, host string, next rpc.TChanNode) *circuitBreakerClient { - cbIface, _ := cbMap.Load(host) - metricsIface, _ := metricsMap.Load(host) +func createCircuitBreakerClient(config Config, logger *zap.Logger, host string, next rpc.TChanNode, state *circuitBreakerState) *circuitBreakerClient { + cbIface, _ := state.cbMap.Load(host) + metricsIface, _ := state.metricsMap.Load(host) return &circuitBreakerClient{ enabled: config.Enabled, From a33d8bb23dd8772f1dbeb43c08317cd68795e819 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Wed, 30 Apr 2025 07:14:15 +0530 Subject: [PATCH 03/23] review change --- .../circuitbreaker/{ => internal}/circuitbreaker/circuit.go | 0 .../{ => internal}/circuitbreaker/circuit_test.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/clock.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/config.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/config_test.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/counters.go | 0 .../{ => internal}/circuitbreaker/counters_test.go | 0 .../client/circuitbreaker/{ => internal}/circuitbreaker/doc.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/state.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/state_test.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/status.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/status_test.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/window.go | 0 .../circuitbreaker/{ => internal}/circuitbreaker/window_test.go | 0 src/dbnode/client/circuitbreaker/middleware/config.go | 2 +- src/dbnode/client/circuitbreaker/middleware/middleware.go | 2 +- 16 files changed, 2 insertions(+), 2 deletions(-) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/circuit.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/circuit_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/clock.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/config.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/config_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/counters.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/counters_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/doc.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/state.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/state_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/status.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/status_test.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/window.go (100%) rename src/dbnode/client/circuitbreaker/{ => internal}/circuitbreaker/window_test.go (100%) diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/circuit.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/circuit.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/circuit.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/circuit.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/circuit_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/circuit_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/circuit_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/circuit_test.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/clock.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/clock.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/clock.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/clock.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/config.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/config.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/config.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/config.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/config_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/config_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/config_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/config_test.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/counters.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/counters.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/counters.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/counters.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/counters_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/counters_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/counters_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/counters_test.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/doc.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/doc.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/doc.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/doc.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/state.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/state.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/state.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/state.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/state_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/state_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/state_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/state_test.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/status.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/status.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/status.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/status.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/status_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/status_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/status_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/status_test.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/window.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/window.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/window.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/window.go diff --git a/src/dbnode/client/circuitbreaker/circuitbreaker/window_test.go b/src/dbnode/client/circuitbreaker/internal/circuitbreaker/window_test.go similarity index 100% rename from src/dbnode/client/circuitbreaker/circuitbreaker/window_test.go rename to src/dbnode/client/circuitbreaker/internal/circuitbreaker/window_test.go diff --git a/src/dbnode/client/circuitbreaker/middleware/config.go b/src/dbnode/client/circuitbreaker/middleware/config.go index 878c0e4480..b7a48777a8 100644 --- a/src/dbnode/client/circuitbreaker/middleware/config.go +++ b/src/dbnode/client/circuitbreaker/middleware/config.go @@ -1,7 +1,7 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" ) // Config represents the configuration for the circuit breaker middleware. diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 48c8007482..5474159c50 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -1,8 +1,8 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreaker" "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" xerrors "github.com/m3db/m3/src/x/errors" "github.com/uber-go/tally" From feacf9e2acaa5aded546e2ea99eeeacf1fbe928d Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Wed, 30 Apr 2025 15:32:32 +0530 Subject: [PATCH 04/23] review change --- src/dbnode/client/circuitbreaker/middleware/metrics.go | 4 ++++ src/dbnode/client/circuitbreaker/middleware/middleware.go | 4 ---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/metrics.go b/src/dbnode/client/circuitbreaker/middleware/metrics.go index 12b95434a3..e677c9b568 100644 --- a/src/dbnode/client/circuitbreaker/middleware/metrics.go +++ b/src/dbnode/client/circuitbreaker/middleware/metrics.go @@ -4,6 +4,10 @@ import ( "github.com/uber-go/tally" ) +const ( + _packageName = "circuit_breaker" +) + type circuitBreakerMetrics struct { successes tally.Counter // counter for successful requests failures tally.Counter // counter for failed requests diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 5474159c50..c4dbf51a7b 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -12,10 +12,6 @@ import ( "sync/atomic" ) -const ( - _packageName = "circuit_breaker" -) - // circuitBreakerClient is a client that wraps a TChannel client with a circuit breaker. type circuitBreakerClient struct { enabled bool From 96bbc2e9c56729d19cddd03994436dda0ed8f47d Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Wed, 30 Apr 2025 18:38:24 +0530 Subject: [PATCH 05/23] review change --- .../circuitbreaker/middleware/middleware.go | 201 +++++++++++++----- 1 file changed, 147 insertions(+), 54 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index c4dbf51a7b..5811996ed1 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -4,11 +4,10 @@ import ( "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" - xerrors "github.com/m3db/m3/src/x/errors" "github.com/uber-go/tally" tchannel "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" - "sync" "sync/atomic" ) @@ -23,66 +22,35 @@ type circuitBreakerClient struct { next rpc.TChanNode } -// M3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. -type M3DBMiddleware func(rpc.TChanNode) CircuitBreakerClient +// m3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. +type m3dbMiddleware func(rpc.TChanNode) CircuitBreakerClient // CircuitBreakerClient defines the interface for a circuit breaker client. type CircuitBreakerClient interface { - WriteBatchRaw(ctx tchannel.ContextWithHeaders, req *rpc.WriteBatchRawRequest) error -} - -// circuitBreakerState holds the shared state for circuit breakers -type circuitBreakerState struct { - cbInitOnce sync.Map // map[string]*sync.Once - cbMap sync.Map // map[string]*atomic.Value - metricsMap sync.Map // map[string]*circuitBreakerMetrics + rpc.TChanNode } // NewCircuitBreakerMiddleware creates a new circuit breaker middleware. -func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) M3DBMiddleware { - state := &circuitBreakerState{} - initializeCircuitBreaker(config, logger, scope, host, state) +func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) m3dbMiddleware { + logger.Info("creating circuit breaker middleware", zap.String("host", host)) - return func(next rpc.TChanNode) *circuitBreakerClient { - return createCircuitBreakerClient(config, logger, host, next, state) + c, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) + if err != nil { + logger.Warn("failed to create circuit breaker", zap.Error(err)) } -} - -// initializeCircuitBreaker initializes the circuit breaker for the given host. -func initializeCircuitBreaker(config Config, logger *zap.Logger, scope tally.Scope, host string, state *circuitBreakerState) { - onceIface, _ := state.cbInitOnce.LoadOrStore(host, new(sync.Once)) - once := onceIface.(*sync.Once) - - once.Do(func() { - logger.Info("creating circuit breaker middleware", zap.String("host", host)) - metrics := newMetrics(scope, host) - state.metricsMap.Store(host, metrics) - - cb, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) - if err != nil { - logger.Warn("failed to create circuit breaker", zap.Error(err)) - return + cb := &atomic.Value{} + cb.Store(c) + + return func(next rpc.TChanNode) CircuitBreakerClient { + return &circuitBreakerClient{ + enabled: config.Enabled, + shadowMode: config.ShadowMode, + next: next, + logger: logger, + host: host, + metrics: newMetrics(scope, host), + cb: cb, } - - cbVal := &atomic.Value{} - cbVal.Store(cb) - state.cbMap.Store(host, cbVal) - }) -} - -// createCircuitBreakerClient creates a new circuit breaker client. -func createCircuitBreakerClient(config Config, logger *zap.Logger, host string, next rpc.TChanNode, state *circuitBreakerState) *circuitBreakerClient { - cbIface, _ := state.cbMap.Load(host) - metricsIface, _ := state.metricsMap.Load(host) - - return &circuitBreakerClient{ - enabled: config.Enabled, - shadowMode: config.ShadowMode, - next: next, - logger: logger, - host: host, - metrics: metricsIface.(*circuitBreakerMetrics), - cb: cbIface.(*atomic.Value), } } @@ -149,8 +117,133 @@ func (c *circuitBreakerClient) handleFailure(cb *circuitbreaker.Circuit) { } // WriteBatchRaw is a method that writes a batch of raw data. -func (c *circuitBreakerClient) WriteBatchRaw(ctx tchannel.ContextWithHeaders, req *rpc.WriteBatchRawRequest) error { +func (c *circuitBreakerClient) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { return withBreaker[*rpc.WriteBatchRawRequest](c, ctx, func() error { return c.next.WriteBatchRaw(ctx, req) }) } + +// Forward all other TChanNode methods to the underlying client +func (c *circuitBreakerClient) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { + return c.next.Aggregate(ctx, req) +} + +func (c *circuitBreakerClient) AggregateRaw(ctx thrift.Context, req *rpc.AggregateQueryRawRequest) (*rpc.AggregateQueryRawResult_, error) { + return c.next.AggregateRaw(ctx, req) +} + +func (c *circuitBreakerClient) AggregateTiles(ctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { + return c.next.AggregateTiles(ctx, req) +} + +func (c *circuitBreakerClient) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) { + return c.next.Bootstrapped(ctx) +} + +func (c *circuitBreakerClient) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { + return c.next.BootstrappedInPlacementOrNoPlacement(ctx) +} + +func (c *circuitBreakerClient) DebugIndexMemorySegments(ctx thrift.Context, req *rpc.DebugIndexMemorySegmentsRequest) (*rpc.DebugIndexMemorySegmentsResult_, error) { + return c.next.DebugIndexMemorySegments(ctx, req) +} + +func (c *circuitBreakerClient) DebugProfileStart(ctx thrift.Context, req *rpc.DebugProfileStartRequest) (*rpc.DebugProfileStartResult_, error) { + return c.next.DebugProfileStart(ctx, req) +} + +func (c *circuitBreakerClient) DebugProfileStop(ctx thrift.Context, req *rpc.DebugProfileStopRequest) (*rpc.DebugProfileStopResult_, error) { + return c.next.DebugProfileStop(ctx, req) +} + +func (c *circuitBreakerClient) Fetch(ctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { + return c.next.Fetch(ctx, req) +} + +func (c *circuitBreakerClient) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) { + return c.next.FetchBatchRaw(ctx, req) +} + +func (c *circuitBreakerClient) FetchBatchRawV2(ctx thrift.Context, req *rpc.FetchBatchRawV2Request) (*rpc.FetchBatchRawResult_, error) { + return c.next.FetchBatchRawV2(ctx, req) +} + +func (c *circuitBreakerClient) FetchBlocksMetadataRawV2(ctx thrift.Context, req *rpc.FetchBlocksMetadataRawV2Request) (*rpc.FetchBlocksMetadataRawV2Result_, error) { + return c.next.FetchBlocksMetadataRawV2(ctx, req) +} + +func (c *circuitBreakerClient) FetchBlocksRaw(ctx thrift.Context, req *rpc.FetchBlocksRawRequest) (*rpc.FetchBlocksRawResult_, error) { + return c.next.FetchBlocksRaw(ctx, req) +} + +func (c *circuitBreakerClient) FetchTagged(ctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { + return c.next.FetchTagged(ctx, req) +} + +func (c *circuitBreakerClient) GetPersistRateLimit(ctx thrift.Context) (*rpc.NodePersistRateLimitResult_, error) { + return c.next.GetPersistRateLimit(ctx) +} + +func (c *circuitBreakerClient) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { + return c.next.GetWriteNewSeriesAsync(ctx) +} + +func (c *circuitBreakerClient) GetWriteNewSeriesBackoffDuration(ctx thrift.Context) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { + return c.next.GetWriteNewSeriesBackoffDuration(ctx) +} + +func (c *circuitBreakerClient) GetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { + return c.next.GetWriteNewSeriesLimitPerShardPerSecond(ctx) +} + +func (c *circuitBreakerClient) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { + return c.next.Health(ctx) +} + +func (c *circuitBreakerClient) Query(ctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { + return c.next.Query(ctx, req) +} + +func (c *circuitBreakerClient) Repair(ctx thrift.Context) error { + return c.next.Repair(ctx) +} + +func (c *circuitBreakerClient) SetPersistRateLimit(ctx thrift.Context, req *rpc.NodeSetPersistRateLimitRequest) (*rpc.NodePersistRateLimitResult_, error) { + return c.next.SetPersistRateLimit(ctx, req) +} + +func (c *circuitBreakerClient) SetWriteNewSeriesAsync(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesAsyncRequest) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { + return c.next.SetWriteNewSeriesAsync(ctx, req) +} + +func (c *circuitBreakerClient) SetWriteNewSeriesBackoffDuration(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { + return c.next.SetWriteNewSeriesBackoffDuration(ctx, req) +} + +func (c *circuitBreakerClient) SetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { + return c.next.SetWriteNewSeriesLimitPerShardPerSecond(ctx, req) +} + +func (c *circuitBreakerClient) Truncate(ctx thrift.Context, req *rpc.TruncateRequest) (*rpc.TruncateResult_, error) { + return c.next.Truncate(ctx, req) +} + +func (c *circuitBreakerClient) Write(ctx thrift.Context, req *rpc.WriteRequest) error { + return c.next.Write(ctx, req) +} + +func (c *circuitBreakerClient) WriteBatchRawV2(ctx thrift.Context, req *rpc.WriteBatchRawV2Request) error { + return c.next.WriteBatchRawV2(ctx, req) +} + +func (c *circuitBreakerClient) WriteTagged(ctx thrift.Context, req *rpc.WriteTaggedRequest) error { + return c.next.WriteTagged(ctx, req) +} + +func (c *circuitBreakerClient) WriteTaggedBatchRaw(ctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { + return c.next.WriteTaggedBatchRaw(ctx, req) +} + +func (c *circuitBreakerClient) WriteTaggedBatchRawV2(ctx thrift.Context, req *rpc.WriteTaggedBatchRawV2Request) error { + return c.next.WriteTaggedBatchRawV2(ctx, req) +} From af5b6736bb731d5eeeea882b9c80f0f183b5e5d0 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Wed, 30 Apr 2025 18:45:15 +0530 Subject: [PATCH 06/23] review change --- .../client/circuitbreaker/middleware/middleware.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 5811996ed1..b7094c1438 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -32,8 +32,6 @@ type CircuitBreakerClient interface { // NewCircuitBreakerMiddleware creates a new circuit breaker middleware. func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) m3dbMiddleware { - logger.Info("creating circuit breaker middleware", zap.String("host", host)) - c, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) if err != nil { logger.Warn("failed to create circuit breaker", zap.Error(err)) @@ -57,7 +55,7 @@ func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally. // withBreaker executes the given call with a circuit breaker if enabled. func withBreaker[T any](c *circuitBreakerClient, ctx tchannel.ContextWithHeaders, call func() error) error { if !c.enabled { - return c.executeWithoutBreaker(call) + return call() } cb := c.getCircuit() @@ -70,7 +68,7 @@ func withBreaker[T any](c *circuitBreakerClient, ctx tchannel.ContextWithHeaders // executeWithoutBreaker executes the given call without a circuit breaker. func (c *circuitBreakerClient) executeWithoutBreaker(call func() error) error { - c.logger.Info("circuit breaker disabled, calling next", zap.String("host", c.host)) + c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) return call() } @@ -83,7 +81,7 @@ func (c *circuitBreakerClient) getCircuit() *circuitbreaker.Circuit { // handleRejectedRequest handles a rejected request by the circuit breaker. func (c *circuitBreakerClient) handleRejectedRequest() error { c.metrics.rejects.Inc(1) - c.logger.Info("circuit breaker request rejected", zap.String("host", c.host)) + c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) if !c.shadowMode { return circuitbreakererror.New(c.host) } @@ -98,21 +96,19 @@ func (c *circuitBreakerClient) executeWithBreaker(cb *circuitbreaker.Circuit, ca } else { c.handleFailure(cb) } - c.logger.Info("circuit breaker call done", zap.String("host", c.host)) return err } // handleSuccess handles a successful request by the circuit breaker. func (c *circuitBreakerClient) handleSuccess(cb *circuitbreaker.Circuit) { cb.ReportRequestStatus(true) - c.logger.Info("circuit breaker call success", zap.String("host", c.host)) c.metrics.successes.Inc(1) } // handleFailure handles a failed request by the circuit breaker. func (c *circuitBreakerClient) handleFailure(cb *circuitbreaker.Circuit) { cb.ReportRequestStatus(false) - c.logger.Info("circuit breaker call failed", zap.String("host", c.host)) + c.logger.Debug("circuit breaker call failed", zap.String("host", c.host)) c.metrics.failures.Inc(1) } From ffb66bc86c8fc64ee17ca0bb45f327a06284afb4 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Wed, 30 Apr 2025 19:08:22 +0530 Subject: [PATCH 07/23] review change --- src/dbnode/client/circuitbreaker/middleware/middleware.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index b7094c1438..505d34f18d 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -55,7 +55,7 @@ func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally. // withBreaker executes the given call with a circuit breaker if enabled. func withBreaker[T any](c *circuitBreakerClient, ctx tchannel.ContextWithHeaders, call func() error) error { if !c.enabled { - return call() + return c.executeWithoutBreaker(call) } cb := c.getCircuit() From d35095cdcde7625a77375b0e2860dcdfbc413860 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Thu, 1 May 2025 10:01:21 +0530 Subject: [PATCH 08/23] review change --- .../circuitbreaker/middleware/config.go | 2 +- .../circuitbreaker/middleware/middleware.go | 133 ++++++++---------- 2 files changed, 62 insertions(+), 73 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/config.go b/src/dbnode/client/circuitbreaker/middleware/config.go index b7a48777a8..1549164c97 100644 --- a/src/dbnode/client/circuitbreaker/middleware/config.go +++ b/src/dbnode/client/circuitbreaker/middleware/config.go @@ -7,6 +7,6 @@ import ( // Config represents the configuration for the circuit breaker middleware. type Config struct { Enabled bool `yaml:"enabled"` - ShadowMode bool `yaml:"ShadowMode"` + ShadowMode bool `yaml:"shadowMode"` CircuitBreakerConfig circuitbreaker.Config `yaml:"circuitBreakerConfig"` } diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 505d34f18d..d10c5f127a 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -8,238 +8,227 @@ import ( tchannel "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" - "sync/atomic" ) -// circuitBreakerClient is a client that wraps a TChannel client with a circuit breaker. -type circuitBreakerClient struct { +// client is a client that wraps a TChannel client with a circuit breaker. +type client struct { enabled bool shadowMode bool logger *zap.Logger - cb *atomic.Value // *circuitbreaker.Circuit + circuit *circuitbreaker.Circuit metrics *circuitBreakerMetrics host string next rpc.TChanNode } // m3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. -type m3dbMiddleware func(rpc.TChanNode) CircuitBreakerClient +type m3dbMiddleware func(rpc.TChanNode) Client -// CircuitBreakerClient defines the interface for a circuit breaker client. -type CircuitBreakerClient interface { +// Client defines the interface for a circuit breaker client. +type Client interface { rpc.TChanNode } -// NewCircuitBreakerMiddleware creates a new circuit breaker middleware. -func NewCircuitBreakerMiddleware(config Config, logger *zap.Logger, scope tally.Scope, host string) m3dbMiddleware { +// New creates a new circuit breaker middleware. +func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (m3dbMiddleware, error) { c, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) if err != nil { logger.Warn("failed to create circuit breaker", zap.Error(err)) + return nil, err } - cb := &atomic.Value{} - cb.Store(c) - return func(next rpc.TChanNode) CircuitBreakerClient { - return &circuitBreakerClient{ + return func(next rpc.TChanNode) (Client, error) { + return &client{ enabled: config.Enabled, shadowMode: config.ShadowMode, next: next, logger: logger, host: host, metrics: newMetrics(scope, host), - cb: cb, - } - } + circuit: c, + }, nil + }, nil } // withBreaker executes the given call with a circuit breaker if enabled. -func withBreaker[T any](c *circuitBreakerClient, ctx tchannel.ContextWithHeaders, call func() error) error { +func withBreaker[T any](c *client, ctx tchannel.ContextWithHeaders, req T, call func(tchannel.ContextWithHeaders, T) error) error { if !c.enabled { return c.executeWithoutBreaker(call) } - cb := c.getCircuit() - if cb == nil || !cb.IsRequestAllowed() { - return c.handleRejectedRequest() + if c.circuit == nil || !c.circuit.IsRequestAllowed() { + return c.handleRejectedRequest(call) } - return c.executeWithBreaker(cb, call) + return c.executeWithBreaker(call) } // executeWithoutBreaker executes the given call without a circuit breaker. -func (c *circuitBreakerClient) executeWithoutBreaker(call func() error) error { +func (c *client) executeWithoutBreaker(call func() error) error { c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) return call() } -// getCircuit retrieves the circuit breaker from the atomic value. -func (c *circuitBreakerClient) getCircuit() *circuitbreaker.Circuit { - cb, _ := c.cb.Load().(*circuitbreaker.Circuit) - return cb -} - // handleRejectedRequest handles a rejected request by the circuit breaker. -func (c *circuitBreakerClient) handleRejectedRequest() error { +func (c *client) handleRejectedRequest(call func() error) error { c.metrics.rejects.Inc(1) c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) if !c.shadowMode { return circuitbreakererror.New(c.host) } - return nil + return call() } // executeWithBreaker executes the given call with a circuit breaker and handles success or failure. -func (c *circuitBreakerClient) executeWithBreaker(cb *circuitbreaker.Circuit, call func() error) error { +func (c *client) executeWithBreaker(call func() error) error { err := call() if err == nil { - c.handleSuccess(cb) + c.handleSuccess() } else { - c.handleFailure(cb) + c.handleFailure() } return err } // handleSuccess handles a successful request by the circuit breaker. -func (c *circuitBreakerClient) handleSuccess(cb *circuitbreaker.Circuit) { - cb.ReportRequestStatus(true) +func (c *client) handleSuccess() { + c.circuit.ReportRequestStatus(true) c.metrics.successes.Inc(1) } // handleFailure handles a failed request by the circuit breaker. -func (c *circuitBreakerClient) handleFailure(cb *circuitbreaker.Circuit) { - cb.ReportRequestStatus(false) +func (c *client) handleFailure() { + c.circuit.ReportRequestStatus(false) c.logger.Debug("circuit breaker call failed", zap.String("host", c.host)) c.metrics.failures.Inc(1) } // WriteBatchRaw is a method that writes a batch of raw data. -func (c *circuitBreakerClient) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { - return withBreaker[*rpc.WriteBatchRawRequest](c, ctx, func() error { - return c.next.WriteBatchRaw(ctx, req) - }) +func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { + return withBreaker(c, ctx, req, c.next.WriteBatchRaw) } // Forward all other TChanNode methods to the underlying client -func (c *circuitBreakerClient) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { +func (c *client) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { return c.next.Aggregate(ctx, req) } -func (c *circuitBreakerClient) AggregateRaw(ctx thrift.Context, req *rpc.AggregateQueryRawRequest) (*rpc.AggregateQueryRawResult_, error) { +func (c *client) AggregateRaw(ctx thrift.Context, req *rpc.AggregateQueryRawRequest) (*rpc.AggregateQueryRawResult_, error) { return c.next.AggregateRaw(ctx, req) } -func (c *circuitBreakerClient) AggregateTiles(ctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { +func (c *client) AggregateTiles(ctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { return c.next.AggregateTiles(ctx, req) } -func (c *circuitBreakerClient) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) { +func (c *client) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) { return c.next.Bootstrapped(ctx) } -func (c *circuitBreakerClient) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { +func (c *client) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { return c.next.BootstrappedInPlacementOrNoPlacement(ctx) } -func (c *circuitBreakerClient) DebugIndexMemorySegments(ctx thrift.Context, req *rpc.DebugIndexMemorySegmentsRequest) (*rpc.DebugIndexMemorySegmentsResult_, error) { +func (c *client) DebugIndexMemorySegments(ctx thrift.Context, req *rpc.DebugIndexMemorySegmentsRequest) (*rpc.DebugIndexMemorySegmentsResult_, error) { return c.next.DebugIndexMemorySegments(ctx, req) } -func (c *circuitBreakerClient) DebugProfileStart(ctx thrift.Context, req *rpc.DebugProfileStartRequest) (*rpc.DebugProfileStartResult_, error) { +func (c *client) DebugProfileStart(ctx thrift.Context, req *rpc.DebugProfileStartRequest) (*rpc.DebugProfileStartResult_, error) { return c.next.DebugProfileStart(ctx, req) } -func (c *circuitBreakerClient) DebugProfileStop(ctx thrift.Context, req *rpc.DebugProfileStopRequest) (*rpc.DebugProfileStopResult_, error) { +func (c *client) DebugProfileStop(ctx thrift.Context, req *rpc.DebugProfileStopRequest) (*rpc.DebugProfileStopResult_, error) { return c.next.DebugProfileStop(ctx, req) } -func (c *circuitBreakerClient) Fetch(ctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { +func (c *client) Fetch(ctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { return c.next.Fetch(ctx, req) } -func (c *circuitBreakerClient) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) { +func (c *client) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) { return c.next.FetchBatchRaw(ctx, req) } -func (c *circuitBreakerClient) FetchBatchRawV2(ctx thrift.Context, req *rpc.FetchBatchRawV2Request) (*rpc.FetchBatchRawResult_, error) { +func (c *client) FetchBatchRawV2(ctx thrift.Context, req *rpc.FetchBatchRawV2Request) (*rpc.FetchBatchRawResult_, error) { return c.next.FetchBatchRawV2(ctx, req) } -func (c *circuitBreakerClient) FetchBlocksMetadataRawV2(ctx thrift.Context, req *rpc.FetchBlocksMetadataRawV2Request) (*rpc.FetchBlocksMetadataRawV2Result_, error) { +func (c *client) FetchBlocksMetadataRawV2(ctx thrift.Context, req *rpc.FetchBlocksMetadataRawV2Request) (*rpc.FetchBlocksMetadataRawV2Result_, error) { return c.next.FetchBlocksMetadataRawV2(ctx, req) } -func (c *circuitBreakerClient) FetchBlocksRaw(ctx thrift.Context, req *rpc.FetchBlocksRawRequest) (*rpc.FetchBlocksRawResult_, error) { +func (c *client) FetchBlocksRaw(ctx thrift.Context, req *rpc.FetchBlocksRawRequest) (*rpc.FetchBlocksRawResult_, error) { return c.next.FetchBlocksRaw(ctx, req) } -func (c *circuitBreakerClient) FetchTagged(ctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { +func (c *client) FetchTagged(ctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { return c.next.FetchTagged(ctx, req) } -func (c *circuitBreakerClient) GetPersistRateLimit(ctx thrift.Context) (*rpc.NodePersistRateLimitResult_, error) { +func (c *client) GetPersistRateLimit(ctx thrift.Context) (*rpc.NodePersistRateLimitResult_, error) { return c.next.GetPersistRateLimit(ctx) } -func (c *circuitBreakerClient) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { +func (c *client) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { return c.next.GetWriteNewSeriesAsync(ctx) } -func (c *circuitBreakerClient) GetWriteNewSeriesBackoffDuration(ctx thrift.Context) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { +func (c *client) GetWriteNewSeriesBackoffDuration(ctx thrift.Context) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { return c.next.GetWriteNewSeriesBackoffDuration(ctx) } -func (c *circuitBreakerClient) GetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { +func (c *client) GetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { return c.next.GetWriteNewSeriesLimitPerShardPerSecond(ctx) } -func (c *circuitBreakerClient) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { +func (c *client) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { return c.next.Health(ctx) } -func (c *circuitBreakerClient) Query(ctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { +func (c *client) Query(ctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { return c.next.Query(ctx, req) } -func (c *circuitBreakerClient) Repair(ctx thrift.Context) error { +func (c *client) Repair(ctx thrift.Context) error { return c.next.Repair(ctx) } -func (c *circuitBreakerClient) SetPersistRateLimit(ctx thrift.Context, req *rpc.NodeSetPersistRateLimitRequest) (*rpc.NodePersistRateLimitResult_, error) { +func (c *client) SetPersistRateLimit(ctx thrift.Context, req *rpc.NodeSetPersistRateLimitRequest) (*rpc.NodePersistRateLimitResult_, error) { return c.next.SetPersistRateLimit(ctx, req) } -func (c *circuitBreakerClient) SetWriteNewSeriesAsync(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesAsyncRequest) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { +func (c *client) SetWriteNewSeriesAsync(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesAsyncRequest) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { return c.next.SetWriteNewSeriesAsync(ctx, req) } -func (c *circuitBreakerClient) SetWriteNewSeriesBackoffDuration(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { +func (c *client) SetWriteNewSeriesBackoffDuration(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { return c.next.SetWriteNewSeriesBackoffDuration(ctx, req) } -func (c *circuitBreakerClient) SetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { +func (c *client) SetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { return c.next.SetWriteNewSeriesLimitPerShardPerSecond(ctx, req) } -func (c *circuitBreakerClient) Truncate(ctx thrift.Context, req *rpc.TruncateRequest) (*rpc.TruncateResult_, error) { +func (c *client) Truncate(ctx thrift.Context, req *rpc.TruncateRequest) (*rpc.TruncateResult_, error) { return c.next.Truncate(ctx, req) } -func (c *circuitBreakerClient) Write(ctx thrift.Context, req *rpc.WriteRequest) error { +func (c *client) Write(ctx thrift.Context, req *rpc.WriteRequest) error { return c.next.Write(ctx, req) } -func (c *circuitBreakerClient) WriteBatchRawV2(ctx thrift.Context, req *rpc.WriteBatchRawV2Request) error { +func (c *client) WriteBatchRawV2(ctx thrift.Context, req *rpc.WriteBatchRawV2Request) error { return c.next.WriteBatchRawV2(ctx, req) } -func (c *circuitBreakerClient) WriteTagged(ctx thrift.Context, req *rpc.WriteTaggedRequest) error { +func (c *client) WriteTagged(ctx thrift.Context, req *rpc.WriteTaggedRequest) error { return c.next.WriteTagged(ctx, req) } -func (c *circuitBreakerClient) WriteTaggedBatchRaw(ctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { +func (c *client) WriteTaggedBatchRaw(ctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { return c.next.WriteTaggedBatchRaw(ctx, req) } -func (c *circuitBreakerClient) WriteTaggedBatchRawV2(ctx thrift.Context, req *rpc.WriteTaggedBatchRawV2Request) error { +func (c *client) WriteTaggedBatchRawV2(ctx thrift.Context, req *rpc.WriteTaggedBatchRawV2Request) error { return c.next.WriteTaggedBatchRawV2(ctx, req) } From e1b9ed12e0a2f076a82ca64a608edf749e287b28 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Thu, 1 May 2025 10:26:11 +0530 Subject: [PATCH 09/23] review change --- .../circuitbreaker/middleware/middleware.go | 58 +++++-------------- 1 file changed, 15 insertions(+), 43 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index d10c5f127a..9aa6e9e15f 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -5,7 +5,6 @@ import ( "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/uber-go/tally" - tchannel "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" ) @@ -37,7 +36,7 @@ func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (m3d return nil, err } - return func(next rpc.TChanNode) (Client, error) { + return func(next rpc.TChanNode) Client { return &client{ enabled: config.Enabled, shadowMode: config.ShadowMode, @@ -46,63 +45,36 @@ func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (m3d host: host, metrics: newMetrics(scope, host), circuit: c, - }, nil + } }, nil } // withBreaker executes the given call with a circuit breaker if enabled. -func withBreaker[T any](c *client, ctx tchannel.ContextWithHeaders, req T, call func(tchannel.ContextWithHeaders, T) error) error { +func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) error) error { if !c.enabled { - return c.executeWithoutBreaker(call) + c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) + return call(ctx, req) } if c.circuit == nil || !c.circuit.IsRequestAllowed() { - return c.handleRejectedRequest(call) + c.metrics.rejects.Inc(1) + c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) + if !c.shadowMode { + return circuitbreakererror.New(c.host) + } } - return c.executeWithBreaker(call) -} - -// executeWithoutBreaker executes the given call without a circuit breaker. -func (c *client) executeWithoutBreaker(call func() error) error { - c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) - return call() -} - -// handleRejectedRequest handles a rejected request by the circuit breaker. -func (c *client) handleRejectedRequest(call func() error) error { - c.metrics.rejects.Inc(1) - c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) - if !c.shadowMode { - return circuitbreakererror.New(c.host) - } - return call() -} - -// executeWithBreaker executes the given call with a circuit breaker and handles success or failure. -func (c *client) executeWithBreaker(call func() error) error { - err := call() + err := call(ctx, req) if err == nil { - c.handleSuccess() + c.circuit.ReportRequestStatus(true) + c.metrics.successes.Inc(1) } else { - c.handleFailure() + c.circuit.ReportRequestStatus(false) + c.metrics.failures.Inc(1) } return err } -// handleSuccess handles a successful request by the circuit breaker. -func (c *client) handleSuccess() { - c.circuit.ReportRequestStatus(true) - c.metrics.successes.Inc(1) -} - -// handleFailure handles a failed request by the circuit breaker. -func (c *client) handleFailure() { - c.circuit.ReportRequestStatus(false) - c.logger.Debug("circuit breaker call failed", zap.String("host", c.host)) - c.metrics.failures.Inc(1) -} - // WriteBatchRaw is a method that writes a batch of raw data. func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { return withBreaker(c, ctx, req, c.next.WriteBatchRaw) From e921354da0b80905e32735808259e398b7c7bdd6 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Thu, 1 May 2025 12:52:59 +0530 Subject: [PATCH 10/23] Adding unit tests --- .../middleware/middleware_test.go | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 src/dbnode/client/circuitbreaker/middleware/middleware_test.go diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go new file mode 100644 index 0000000000..5bc4cc2bf2 --- /dev/null +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -0,0 +1,170 @@ +package middleware + +import ( + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" + "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/tchannel-go/thrift" + "go.uber.org/zap" +) + +func TestNew(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("invalid_config", func(t *testing.T) { + config := Config{ + Enabled: true, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: -1, + }, + } + middleware, err := New(config, logger, scope, host) + require.Error(t, err) + require.Nil(t, middleware) + }) + + t.Run("valid_config", func(t *testing.T) { + config := Config{ + Enabled: true, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + require.NotNil(t, middleware) + + client := middleware(mockNode) + require.NotNil(t, client) + }) +} + +func TestWriteBatchRaw(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("circuit_breaker_disabled", func(t *testing.T) { + config := Config{ + Enabled: false, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + req := &rpc.WriteBatchRawRequest{} + + mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) + err = client.WriteBatchRaw(ctx, req) + require.NoError(t, err) + }) + + t.Run("circuit_breaker_rejected_not_in_shadow_mode", func(t *testing.T) { + config := Config{ + Enabled: true, + ShadowMode: false, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + req := &rpc.WriteBatchRawRequest{} + + // First request should fail and trigger circuit breaker + mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(errors.New("test error")) + err = client.WriteBatchRaw(ctx, req) + require.Error(t, err) + + // Second request should be rejected by circuit breaker + err = client.WriteBatchRaw(ctx, req) + require.Error(t, err) + }) + + t.Run("circuit_breaker_rejected_in_shadow_mode", func(t *testing.T) { + config := Config{ + Enabled: true, + ShadowMode: true, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + req := &rpc.WriteBatchRawRequest{} + + // First request should fail and trigger circuit breaker + mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(errors.New("test error")) + err = client.WriteBatchRaw(ctx, req) + require.Error(t, err) + + // Second request should still go through in shadow mode + mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) + err = client.WriteBatchRaw(ctx, req) + require.NoError(t, err) + }) + + t.Run("circuit_breaker_success", func(t *testing.T) { + config := Config{ + Enabled: true, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + req := &rpc.WriteBatchRawRequest{} + + mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) + err = client.WriteBatchRaw(ctx, req) + require.NoError(t, err) + }) +} From e3d468ad3ac0e6b8610ce3f1a4ab56f4e2d4f990 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Thu, 1 May 2025 13:55:44 +0530 Subject: [PATCH 11/23] Adding shadow reject metrics --- .../circuitbreaker/middleware/metrics.go | 22 +++++------ .../circuitbreaker/middleware/middleware.go | 38 +++++++++++++++++-- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/metrics.go b/src/dbnode/client/circuitbreaker/middleware/metrics.go index e677c9b568..86cb9b24ff 100644 --- a/src/dbnode/client/circuitbreaker/middleware/metrics.go +++ b/src/dbnode/client/circuitbreaker/middleware/metrics.go @@ -9,21 +9,17 @@ const ( ) type circuitBreakerMetrics struct { - successes tally.Counter // counter for successful requests - failures tally.Counter // counter for failed requests - rejects tally.Counter // counter for rejected requests + rejects tally.Counter + shadowRejects tally.Counter + successes tally.Counter + failures tally.Counter } func newMetrics(scope tally.Scope, host string) *circuitBreakerMetrics { - taggedScope := scope.Tagged(map[string]string{ - "component": _packageName, - "host": host, - }) - - metrics := &circuitBreakerMetrics{ - successes: taggedScope.Counter("circuit_breaker_successes"), - failures: taggedScope.Counter("circuit_breaker_failures"), - rejects: taggedScope.Counter("circuit_breaker_rejects"), + return &circuitBreakerMetrics{ + rejects: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_rejects"), + shadowRejects: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_shadow_rejects"), + successes: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_successes"), + failures: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_failures"), } - return metrics } diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 9aa6e9e15f..9a2ae8a110 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -57,13 +57,14 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C } if c.circuit == nil || !c.circuit.IsRequestAllowed() { - c.metrics.rejects.Inc(1) c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) - if !c.shadowMode { + if c.shadowMode { + c.metrics.shadowRejects.Inc(1) + } else { + c.metrics.rejects.Inc(1) return circuitbreakererror.New(c.host) } } - err := call(ctx, req) if err == nil { c.circuit.ReportRequestStatus(true) @@ -75,6 +76,37 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C return err } +// withBreakerWithResult executes the given call with a circuit breaker if enabled and returns both result and error. +func withBreakerWithResult[T any, R any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) (R, error)) (R, error) { + if !c.enabled { + c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) + return call(ctx, req) + } + + if c.circuit == nil || !c.circuit.IsRequestAllowed() { + if c.shadowMode { + c.metrics.shadowRejects.Inc(1) + } else { + c.metrics.rejects.Inc(1) + } + c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) + if !c.shadowMode { + var zero R + return zero, circuitbreakererror.New(c.host) + } + } + + result, err := call(ctx, req) + if err == nil { + c.circuit.ReportRequestStatus(true) + c.metrics.successes.Inc(1) + } else { + c.circuit.ReportRequestStatus(false) + c.metrics.failures.Inc(1) + } + return result, err +} + // WriteBatchRaw is a method that writes a batch of raw data. func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { return withBreaker(c, ctx, req, c.next.WriteBatchRaw) From 2e36aea5e92d09b435058ef42c2a466079e6c168 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Thu, 1 May 2025 15:02:31 +0530 Subject: [PATCH 12/23] updating packages --- src/dbnode/client/circuitbreaker/middleware/config.go | 2 +- src/dbnode/client/circuitbreaker/middleware/middleware.go | 2 +- src/dbnode/client/circuitbreaker/middleware/middleware_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/config.go b/src/dbnode/client/circuitbreaker/middleware/config.go index 1549164c97..d3ee4ef055 100644 --- a/src/dbnode/client/circuitbreaker/middleware/config.go +++ b/src/dbnode/client/circuitbreaker/middleware/config.go @@ -1,7 +1,7 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker" ) // Config represents the configuration for the circuit breaker middleware. diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 9a2ae8a110..1f612ca27f 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -1,8 +1,8 @@ package middleware import ( + "github.com/m3db/m3/src/dbnode/client/circuitbreaker" "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 5bc4cc2bf2..8364fafb6d 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/stretchr/testify/require" "github.com/uber-go/tally" From 55d6b4bce3fd860d7f991e4755d6f1a6d3f5a00c Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Fri, 2 May 2025 14:38:31 +0530 Subject: [PATCH 13/23] review change --- .../circuitbreaker/middleware/metrics.go | 4 +- .../circuitbreaker/middleware/middleware.go | 40 +++---------------- 2 files changed, 8 insertions(+), 36 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/metrics.go b/src/dbnode/client/circuitbreaker/middleware/metrics.go index 86cb9b24ff..07d871e6ad 100644 --- a/src/dbnode/client/circuitbreaker/middleware/metrics.go +++ b/src/dbnode/client/circuitbreaker/middleware/metrics.go @@ -17,9 +17,9 @@ type circuitBreakerMetrics struct { func newMetrics(scope tally.Scope, host string) *circuitBreakerMetrics { return &circuitBreakerMetrics{ - rejects: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_rejects"), - shadowRejects: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_shadow_rejects"), successes: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_successes"), failures: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_failures"), + rejects: scope.Tagged(map[string]string{"host": host, "mode": "live"}).Counter("circuit_breaker_rejects"), + shadowRejects: scope.Tagged(map[string]string{"host": host, "mode": "shadow"}).Counter("circuit_breaker_rejects"), } } diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 1f612ca27f..19246d97ca 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -51,12 +51,13 @@ func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (m3d // withBreaker executes the given call with a circuit breaker if enabled. func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) error) error { - if !c.enabled { - c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) + // Early return if circuit breaker is disabled or not initialized + if !c.enabled || c.circuit == nil { return call(ctx, req) } - if c.circuit == nil || !c.circuit.IsRequestAllowed() { + // Check if request is allowed + if !c.circuit.IsRequestAllowed() { c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) if c.shadowMode { c.metrics.shadowRejects.Inc(1) @@ -65,6 +66,8 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C return circuitbreakererror.New(c.host) } } + + // Execute the request and update circuit breaker state err := call(ctx, req) if err == nil { c.circuit.ReportRequestStatus(true) @@ -76,37 +79,6 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C return err } -// withBreakerWithResult executes the given call with a circuit breaker if enabled and returns both result and error. -func withBreakerWithResult[T any, R any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) (R, error)) (R, error) { - if !c.enabled { - c.logger.Debug("circuit breaker disabled, calling next", zap.String("host", c.host)) - return call(ctx, req) - } - - if c.circuit == nil || !c.circuit.IsRequestAllowed() { - if c.shadowMode { - c.metrics.shadowRejects.Inc(1) - } else { - c.metrics.rejects.Inc(1) - } - c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) - if !c.shadowMode { - var zero R - return zero, circuitbreakererror.New(c.host) - } - } - - result, err := call(ctx, req) - if err == nil { - c.circuit.ReportRequestStatus(true) - c.metrics.successes.Inc(1) - } else { - c.circuit.ReportRequestStatus(false) - c.metrics.failures.Inc(1) - } - return result, err -} - // WriteBatchRaw is a method that writes a batch of raw data. func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { return withBreaker(c, ctx, req, c.next.WriteBatchRaw) From e0c3a6a3539d64eca2d9e5b8954faa380c372f99 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Mon, 19 May 2025 18:54:29 +0530 Subject: [PATCH 14/23] lint fix --- .../circuitbreakererror/error.go | 3 +- .../circuitbreaker/middleware/middleware.go | 85 +- .../middleware/middleware_test.go | 725 +++++++++++++++++- 3 files changed, 780 insertions(+), 33 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/circuitbreakererror/error.go b/src/dbnode/client/circuitbreaker/circuitbreakererror/error.go index 816cde61bb..bda302d9f6 100644 --- a/src/dbnode/client/circuitbreaker/circuitbreakererror/error.go +++ b/src/dbnode/client/circuitbreaker/circuitbreakererror/error.go @@ -1,8 +1,9 @@ package circuitbreakererror import ( - xerrors "github.com/m3db/m3/src/x/errors" "strings" + + xerrors "github.com/m3db/m3/src/x/errors" ) // circuitBreakerError is an error type that indicates a circuit breaker error. diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 19246d97ca..aef3fcf423 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -1,12 +1,13 @@ package middleware import ( - "github.com/m3db/m3/src/dbnode/client/circuitbreaker" - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" - "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" + + "github.com/m3db/m3/src/dbnode/client/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" + "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ) // client is a client that wraps a TChannel client with a circuit breaker. @@ -20,8 +21,8 @@ type client struct { next rpc.TChanNode } -// m3dbMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. -type m3dbMiddleware func(rpc.TChanNode) Client +// M3DBMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. +type M3DBMiddleware func(rpc.TChanNode) Client // Client defines the interface for a circuit breaker client. type Client interface { @@ -29,7 +30,7 @@ type Client interface { } // New creates a new circuit breaker middleware. -func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (m3dbMiddleware, error) { +func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (M3DBMiddleware, error) { c, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) if err != nil { logger.Warn("failed to create circuit breaker", zap.Error(err)) @@ -89,11 +90,17 @@ func (c *client) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) ( return c.next.Aggregate(ctx, req) } -func (c *client) AggregateRaw(ctx thrift.Context, req *rpc.AggregateQueryRawRequest) (*rpc.AggregateQueryRawResult_, error) { +func (c *client) AggregateRaw( + ctx thrift.Context, + req *rpc.AggregateQueryRawRequest, +) (*rpc.AggregateQueryRawResult_, error) { return c.next.AggregateRaw(ctx, req) } -func (c *client) AggregateTiles(ctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { +func (c *client) AggregateTiles( + ctx thrift.Context, + req *rpc.AggregateTilesRequest, +) (*rpc.AggregateTilesResult_, error) { return c.next.AggregateTiles(ctx, req) } @@ -101,19 +108,30 @@ func (c *client) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, return c.next.Bootstrapped(ctx) } -func (c *client) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { +func (c *client) BootstrappedInPlacementOrNoPlacement( + ctx thrift.Context, +) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { return c.next.BootstrappedInPlacementOrNoPlacement(ctx) } -func (c *client) DebugIndexMemorySegments(ctx thrift.Context, req *rpc.DebugIndexMemorySegmentsRequest) (*rpc.DebugIndexMemorySegmentsResult_, error) { +func (c *client) DebugIndexMemorySegments( + ctx thrift.Context, + req *rpc.DebugIndexMemorySegmentsRequest, +) (*rpc.DebugIndexMemorySegmentsResult_, error) { return c.next.DebugIndexMemorySegments(ctx, req) } -func (c *client) DebugProfileStart(ctx thrift.Context, req *rpc.DebugProfileStartRequest) (*rpc.DebugProfileStartResult_, error) { +func (c *client) DebugProfileStart( + ctx thrift.Context, + req *rpc.DebugProfileStartRequest, +) (*rpc.DebugProfileStartResult_, error) { return c.next.DebugProfileStart(ctx, req) } -func (c *client) DebugProfileStop(ctx thrift.Context, req *rpc.DebugProfileStopRequest) (*rpc.DebugProfileStopResult_, error) { +func (c *client) DebugProfileStop( + ctx thrift.Context, + req *rpc.DebugProfileStopRequest, +) (*rpc.DebugProfileStopResult_, error) { return c.next.DebugProfileStop(ctx, req) } @@ -125,15 +143,24 @@ func (c *client) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest return c.next.FetchBatchRaw(ctx, req) } -func (c *client) FetchBatchRawV2(ctx thrift.Context, req *rpc.FetchBatchRawV2Request) (*rpc.FetchBatchRawResult_, error) { +func (c *client) FetchBatchRawV2( + ctx thrift.Context, + req *rpc.FetchBatchRawV2Request, +) (*rpc.FetchBatchRawResult_, error) { return c.next.FetchBatchRawV2(ctx, req) } -func (c *client) FetchBlocksMetadataRawV2(ctx thrift.Context, req *rpc.FetchBlocksMetadataRawV2Request) (*rpc.FetchBlocksMetadataRawV2Result_, error) { +func (c *client) FetchBlocksMetadataRawV2( + ctx thrift.Context, + req *rpc.FetchBlocksMetadataRawV2Request, +) (*rpc.FetchBlocksMetadataRawV2Result_, error) { return c.next.FetchBlocksMetadataRawV2(ctx, req) } -func (c *client) FetchBlocksRaw(ctx thrift.Context, req *rpc.FetchBlocksRawRequest) (*rpc.FetchBlocksRawResult_, error) { +func (c *client) FetchBlocksRaw( + ctx thrift.Context, + req *rpc.FetchBlocksRawRequest, +) (*rpc.FetchBlocksRawResult_, error) { return c.next.FetchBlocksRaw(ctx, req) } @@ -149,11 +176,15 @@ func (c *client) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSe return c.next.GetWriteNewSeriesAsync(ctx) } -func (c *client) GetWriteNewSeriesBackoffDuration(ctx thrift.Context) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { +func (c *client) GetWriteNewSeriesBackoffDuration( + ctx thrift.Context, +) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { return c.next.GetWriteNewSeriesBackoffDuration(ctx) } -func (c *client) GetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { +func (c *client) GetWriteNewSeriesLimitPerShardPerSecond( + ctx thrift.Context, +) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { return c.next.GetWriteNewSeriesLimitPerShardPerSecond(ctx) } @@ -169,19 +200,31 @@ func (c *client) Repair(ctx thrift.Context) error { return c.next.Repair(ctx) } -func (c *client) SetPersistRateLimit(ctx thrift.Context, req *rpc.NodeSetPersistRateLimitRequest) (*rpc.NodePersistRateLimitResult_, error) { +func (c *client) SetPersistRateLimit( + ctx thrift.Context, + req *rpc.NodeSetPersistRateLimitRequest, +) (*rpc.NodePersistRateLimitResult_, error) { return c.next.SetPersistRateLimit(ctx, req) } -func (c *client) SetWriteNewSeriesAsync(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesAsyncRequest) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { +func (c *client) SetWriteNewSeriesAsync( + ctx thrift.Context, + req *rpc.NodeSetWriteNewSeriesAsyncRequest, +) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { return c.next.SetWriteNewSeriesAsync(ctx, req) } -func (c *client) SetWriteNewSeriesBackoffDuration(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { +func (c *client) SetWriteNewSeriesBackoffDuration( + ctx thrift.Context, + req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest, +) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { return c.next.SetWriteNewSeriesBackoffDuration(ctx, req) } -func (c *client) SetWriteNewSeriesLimitPerShardPerSecond(ctx thrift.Context, req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { +func (c *client) SetWriteNewSeriesLimitPerShardPerSecond( + ctx thrift.Context, + req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest, +) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { return c.next.SetWriteNewSeriesLimitPerShardPerSecond(ctx, req) } diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 8364fafb6d..a9f4a673d9 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -6,14 +6,29 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/client/circuitbreaker" - "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/stretchr/testify/require" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" + + "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" + "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ) +// newTestConfig creates a common test configuration for middleware tests +func newTestConfig(enabled bool, shadowMode bool) Config { + return Config{ + Enabled: enabled, + ShadowMode: shadowMode, + CircuitBreakerConfig: circuitbreaker.Config{ + WindowSize: 15, + BucketDuration: time.Second, + FailureRatio: 0.1, + MinimumRequests: 1, + }, + } +} + func TestNew(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -36,15 +51,7 @@ func TestNew(t *testing.T) { }) t.Run("valid_config", func(t *testing.T) { - config := Config{ - Enabled: true, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, - }, - } + config := newTestConfig(true, false) middleware, err := New(config, logger, scope, host) require.NoError(t, err) require.NotNil(t, middleware) @@ -168,3 +175,699 @@ func TestWriteBatchRaw(t *testing.T) { require.NoError(t, err) }) } + +func TestFetch(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchRequest{} + resp := &rpc.FetchResult_{} + + mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Fetch(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchRequest{} + resp := &rpc.FetchResult_{} + + mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Fetch(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchRequest{} + resp := &rpc.FetchResult_{} + + mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Fetch(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().Fetch(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.Fetch(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestFetchTagged(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchTaggedRequest{} + resp := &rpc.FetchTaggedResult_{} + + mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.FetchTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchTaggedRequest{} + resp := &rpc.FetchTaggedResult_{} + + mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.FetchTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchTaggedRequest{} + resp := &rpc.FetchTaggedResult_{} + + mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.FetchTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.FetchTaggedRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.FetchTagged(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestWrite(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteRequest{} + resp := &rpc.WriteResult_{} + + mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Write(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteRequest{} + resp := &rpc.WriteResult_{} + + mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Write(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteRequest{} + resp := &rpc.WriteResult_{} + + mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Write(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().Write(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.Write(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestWriteTagged(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteTaggedRequest{} + resp := &rpc.WriteTaggedResult_{} + + mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.WriteTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteTaggedRequest{} + resp := &rpc.WriteTaggedResult_{} + + mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.WriteTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteTaggedRequest{} + resp := &rpc.WriteTaggedResult_{} + + mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.WriteTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.WriteTaggedRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.WriteTagged(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestAggregate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateRequest{} + resp := &rpc.AggregateResult_{} + + mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Aggregate(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateRequest{} + resp := &rpc.AggregateResult_{} + + mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Aggregate(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateRequest{} + resp := &rpc.AggregateResult_{} + + mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Aggregate(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.Aggregate(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestAggregateTagged(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateTaggedRequest{} + resp := &rpc.AggregateTaggedResult_{} + + mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.AggregateTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateTaggedRequest{} + resp := &rpc.AggregateTaggedResult_{} + + mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.AggregateTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateTaggedRequest{} + resp := &rpc.AggregateTaggedResult_{} + + mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.AggregateTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.AggregateTaggedRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.AggregateTagged(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestQuery(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryRequest{} + resp := &rpc.QueryResult_{} + + mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Query(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryRequest{} + resp := &rpc.QueryResult_{} + + mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Query(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryRequest{} + resp := &rpc.QueryResult_{} + + mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.Query(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().Query(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.Query(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} + +func TestQueryTagged(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockNode := rpc.NewMockTChanNode(ctrl) + logger := zap.NewNop() + scope := tally.NewTestScope("", nil) + host := "test-host" + + t.Run("disabled", func(t *testing.T) { + config := newTestConfig(false, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryTaggedRequest{} + resp := &rpc.QueryTaggedResult_{} + + mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.QueryTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("shadow_mode", func(t *testing.T) { + config := newTestConfig(true, true) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryTaggedRequest{} + resp := &rpc.QueryTaggedResult_{} + + mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.QueryTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("success", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryTaggedRequest{} + resp := &rpc.QueryTaggedResult_{} + + mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) + + actualResp, err := client.QueryTagged(ctx, req) + require.NoError(t, err) + require.Equal(t, resp, actualResp) + }) + + t.Run("error", func(t *testing.T) { + config := newTestConfig(true, false) + middleware, err := New(config, logger, scope, host) + require.NoError(t, err) + + client := middleware(mockNode) + require.NotNil(t, client) + + ctx := thrift.NewContext(time.Second) + req := &rpc.QueryTaggedRequest{} + expectedErr := errors.New("test error") + + mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(nil, expectedErr) + + actualResp, err := client.QueryTagged(ctx, req) + require.Error(t, err) + require.Equal(t, expectedErr, err) + require.Nil(t, actualResp) + }) +} From fd8e7a62e19a9385cc30e185ca8537d956c6cee8 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Mon, 19 May 2025 19:24:46 +0530 Subject: [PATCH 15/23] review change --- .../middleware/middleware_test.go | 698 +----------------- 1 file changed, 1 insertion(+), 697 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index a9f4a673d9..3be105395b 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -11,7 +11,7 @@ import ( "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" - "github.com/m3db/m3/src/dbnode/client/circuitbreaker/internal/circuitbreaker" + "github.com/m3db/m3/src/dbnode/client/circuitbreaker" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ) @@ -175,699 +175,3 @@ func TestWriteBatchRaw(t *testing.T) { require.NoError(t, err) }) } - -func TestFetch(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchRequest{} - resp := &rpc.FetchResult_{} - - mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Fetch(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchRequest{} - resp := &rpc.FetchResult_{} - - mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Fetch(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchRequest{} - resp := &rpc.FetchResult_{} - - mockNode.EXPECT().Fetch(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Fetch(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().Fetch(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.Fetch(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestFetchTagged(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchTaggedRequest{} - resp := &rpc.FetchTaggedResult_{} - - mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.FetchTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchTaggedRequest{} - resp := &rpc.FetchTaggedResult_{} - - mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.FetchTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchTaggedRequest{} - resp := &rpc.FetchTaggedResult_{} - - mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.FetchTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.FetchTaggedRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().FetchTagged(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.FetchTagged(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestWrite(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteRequest{} - resp := &rpc.WriteResult_{} - - mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Write(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteRequest{} - resp := &rpc.WriteResult_{} - - mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Write(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteRequest{} - resp := &rpc.WriteResult_{} - - mockNode.EXPECT().Write(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Write(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().Write(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.Write(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestWriteTagged(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteTaggedRequest{} - resp := &rpc.WriteTaggedResult_{} - - mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.WriteTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteTaggedRequest{} - resp := &rpc.WriteTaggedResult_{} - - mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.WriteTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteTaggedRequest{} - resp := &rpc.WriteTaggedResult_{} - - mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.WriteTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.WriteTaggedRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().WriteTagged(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.WriteTagged(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestAggregate(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateRequest{} - resp := &rpc.AggregateResult_{} - - mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Aggregate(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateRequest{} - resp := &rpc.AggregateResult_{} - - mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Aggregate(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateRequest{} - resp := &rpc.AggregateResult_{} - - mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Aggregate(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().Aggregate(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.Aggregate(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestAggregateTagged(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateTaggedRequest{} - resp := &rpc.AggregateTaggedResult_{} - - mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.AggregateTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateTaggedRequest{} - resp := &rpc.AggregateTaggedResult_{} - - mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.AggregateTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateTaggedRequest{} - resp := &rpc.AggregateTaggedResult_{} - - mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.AggregateTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.AggregateTaggedRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().AggregateTagged(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.AggregateTagged(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestQuery(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryRequest{} - resp := &rpc.QueryResult_{} - - mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Query(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryRequest{} - resp := &rpc.QueryResult_{} - - mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Query(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryRequest{} - resp := &rpc.QueryResult_{} - - mockNode.EXPECT().Query(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.Query(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().Query(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.Query(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} - -func TestQueryTagged(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryTaggedRequest{} - resp := &rpc.QueryTaggedResult_{} - - mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.QueryTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryTaggedRequest{} - resp := &rpc.QueryTaggedResult_{} - - mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.QueryTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryTaggedRequest{} - resp := &rpc.QueryTaggedResult_{} - - mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(resp, nil) - - actualResp, err := client.QueryTagged(ctx, req) - require.NoError(t, err) - require.Equal(t, resp, actualResp) - }) - - t.Run("error", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - require.NotNil(t, client) - - ctx := thrift.NewContext(time.Second) - req := &rpc.QueryTaggedRequest{} - expectedErr := errors.New("test error") - - mockNode.EXPECT().QueryTagged(gomock.Any(), req).Return(nil, expectedErr) - - actualResp, err := client.QueryTagged(ctx, req) - require.Error(t, err) - require.Equal(t, expectedErr, err) - require.Nil(t, actualResp) - }) -} From b05e68072a186a79e80135db47c147f39cd63d94 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Mon, 19 May 2025 20:16:55 +0530 Subject: [PATCH 16/23] review change --- .../middleware/middleware_test.go | 31 ++----------------- 1 file changed, 3 insertions(+), 28 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 3be105395b..55474404fc 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -71,15 +71,7 @@ func TestWriteBatchRaw(t *testing.T) { host := "test-host" t.Run("circuit_breaker_disabled", func(t *testing.T) { - config := Config{ - Enabled: false, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, - }, - } + config := newTestConfig(false, false) middleware, err := New(config, logger, scope, host) require.NoError(t, err) @@ -123,16 +115,7 @@ func TestWriteBatchRaw(t *testing.T) { }) t.Run("circuit_breaker_rejected_in_shadow_mode", func(t *testing.T) { - config := Config{ - Enabled: true, - ShadowMode: true, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, - }, - } + config := newTestConfig(true, true), middleware, err := New(config, logger, scope, host) require.NoError(t, err) @@ -153,15 +136,7 @@ func TestWriteBatchRaw(t *testing.T) { }) t.Run("circuit_breaker_success", func(t *testing.T) { - config := Config{ - Enabled: true, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, - }, - } + config := newTestConfig(true, false) middleware, err := New(config, logger, scope, host) require.NoError(t, err) From 01f77147f21a0c7fe22887a4116f8278fbf338ac Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Mon, 19 May 2025 20:30:49 +0530 Subject: [PATCH 17/23] review change --- src/dbnode/client/circuitbreaker/middleware/middleware_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 55474404fc..7c086e9a22 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -115,7 +115,7 @@ func TestWriteBatchRaw(t *testing.T) { }) t.Run("circuit_breaker_rejected_in_shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true), + config := newTestConfig(true, true) middleware, err := New(config, logger, scope, host) require.NoError(t, err) From 860f687113b1cd7413e404a2d37e65d8529fd5db Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Mon, 19 May 2025 21:17:27 +0530 Subject: [PATCH 18/23] review change --- src/dbnode/client/circuitbreaker/middleware/metrics.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/metrics.go b/src/dbnode/client/circuitbreaker/middleware/metrics.go index 07d871e6ad..f446d851a4 100644 --- a/src/dbnode/client/circuitbreaker/middleware/metrics.go +++ b/src/dbnode/client/circuitbreaker/middleware/metrics.go @@ -4,10 +4,6 @@ import ( "github.com/uber-go/tally" ) -const ( - _packageName = "circuit_breaker" -) - type circuitBreakerMetrics struct { rejects tally.Counter shadowRejects tally.Counter From 2564876c20d2b0d5427d63cad0299c69c9bf99ef Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Sun, 25 May 2025 07:38:36 +0530 Subject: [PATCH 19/23] review change --- .../client/circuitbreaker/middleware/middleware.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index aef3fcf423..90a7a6dffd 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -58,8 +58,10 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C } // Check if request is allowed - if !c.circuit.IsRequestAllowed() { - c.logger.Debug("circuit breaker request rejected", zap.String("host", c.host)) + isAllowed := c.circuit.IsRequestAllowed() + + // If request is not allowed, log and return error + if !isAllowed { if c.shadowMode { c.metrics.shadowRejects.Inc(1) } else { @@ -71,12 +73,14 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C // Execute the request and update circuit breaker state err := call(ctx, req) if err == nil { - c.circuit.ReportRequestStatus(true) c.metrics.successes.Inc(1) } else { - c.circuit.ReportRequestStatus(false) c.metrics.failures.Inc(1) } + + if isAllowed { + c.circuit.ReportRequestStatus(err == nil) + } return err } From ef71454f12154a11b7aafad91e14d7e875926565 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Sun, 25 May 2025 10:17:20 +0530 Subject: [PATCH 20/23] review change --- .../circuitbreaker/middleware/middleware.go | 27 +- .../middleware/middleware_test.go | 326 ++++++++++++------ 2 files changed, 231 insertions(+), 122 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware.go b/src/dbnode/client/circuitbreaker/middleware/middleware.go index 90a7a6dffd..7f1eb413d7 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware.go @@ -29,22 +29,30 @@ type Client interface { rpc.TChanNode } +// Params contains all parameters needed to create a new middleware +type Params struct { + Config Config + Logger *zap.Logger + Scope tally.Scope + Host string +} + // New creates a new circuit breaker middleware. -func New(config Config, logger *zap.Logger, scope tally.Scope, host string) (M3DBMiddleware, error) { - c, err := circuitbreaker.NewCircuit(config.CircuitBreakerConfig) +func New(params Params) (M3DBMiddleware, error) { + c, err := circuitbreaker.NewCircuit(params.Config.CircuitBreakerConfig) if err != nil { - logger.Warn("failed to create circuit breaker", zap.Error(err)) + params.Logger.Warn("failed to create circuit breaker", zap.Error(err)) return nil, err } return func(next rpc.TChanNode) Client { return &client{ - enabled: config.Enabled, - shadowMode: config.ShadowMode, + enabled: params.Config.Enabled, + shadowMode: params.Config.ShadowMode, next: next, - logger: logger, - host: host, - metrics: newMetrics(scope, host), + logger: params.Logger, + host: params.Host, + metrics: newMetrics(params.Scope, params.Host), circuit: c, } }, nil @@ -70,7 +78,7 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C } } - // Execute the request and update circuit breaker state + // Execute the request and update metrics err := call(ctx, req) if err == nil { c.metrics.successes.Inc(1) @@ -78,6 +86,7 @@ func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.C c.metrics.failures.Inc(1) } + // Report request status to circuit breaker if isAllowed { c.circuit.ReportRequestStatus(err == nil) } diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 7c086e9a22..be2239ef5f 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package middleware import ( @@ -6,6 +26,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" @@ -16,15 +37,16 @@ import ( ) // newTestConfig creates a common test configuration for middleware tests -func newTestConfig(enabled bool, shadowMode bool) Config { +func newTestConfig(enabled, shadowMode bool) Config { return Config{ Enabled: enabled, ShadowMode: shadowMode, CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, + MinimumRequests: 1, + FailureRatio: 0.1, + MinimumProbeRequests: 0, + WindowSize: 1, + BucketDuration: time.Millisecond, }, } } @@ -33,120 +55,198 @@ func TestNew(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" + tests := []struct { + name string + params Params + expectError bool + }{ + { + name: "valid params", + params: Params{ + Config: Config{ + Enabled: true, + CircuitBreakerConfig: circuitbreaker.Config{ + MinimumRequests: 1, + FailureRatio: 0.1, + MinimumProbeRequests: 0, + WindowSize: 1, + BucketDuration: time.Millisecond, + }, + }, + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + expectError: false, + }, + { + name: "invalid circuit breaker config", + params: Params{ + Config: Config{ + Enabled: true, + CircuitBreakerConfig: circuitbreaker.Config{ + MinimumRequests: -1, // Invalid config + FailureRatio: 0.1, + MinimumProbeRequests: 0, + WindowSize: 1, + BucketDuration: time.Millisecond, + }, + }, + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + middlewareFn, err := New(tt.params) + if tt.expectError { + assert.Error(t, err) + return + } + require.NoError(t, err) + + // Test that the middleware function returns a client + mockNode := rpc.NewMockTChanNode(ctrl) + client := middlewareFn(mockNode) + assert.NotNil(t, client) + assert.Implements(t, (*rpc.TChanNode)(nil), client) + }) + } +} + +func TestClient_WriteBatchRaw(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() - t.Run("invalid_config", func(t *testing.T) { - config := Config{ - Enabled: true, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: -1, + tests := []struct { + name string + params Params + mockBehavior func(*rpc.MockTChanNode) + expectedError bool + }{ + { + name: "successful write", + params: Params{ + Config: newTestConfig(true, false), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil) + }, + expectedError: false, + }, + { + name: "failed write", + params: Params{ + Config: newTestConfig(true, false), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) }, - } - middleware, err := New(config, logger, scope, host) - require.Error(t, err) - require.Nil(t, middleware) - }) - - t.Run("valid_config", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - require.NotNil(t, middleware) - - client := middleware(mockNode) - require.NotNil(t, client) - }) + expectedError: true, + }, + { + name: "circuit breaker disabled", + params: Params{ + Config: newTestConfig(false, false), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil) + }, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + middlewareFn, err := New(tt.params) + require.NoError(t, err) + + mockNode := rpc.NewMockTChanNode(ctrl) + tt.mockBehavior(mockNode) + + client := middlewareFn(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + + err = client.WriteBatchRaw(ctx, &rpc.WriteBatchRawRequest{}) + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } } -func TestWriteBatchRaw(t *testing.T) { +func TestClient_ShadowMode(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockNode := rpc.NewMockTChanNode(ctrl) - logger := zap.NewNop() - scope := tally.NewTestScope("", nil) - host := "test-host" - - t.Run("circuit_breaker_disabled", func(t *testing.T) { - config := newTestConfig(false, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - ctx, cancel := thrift.NewContext(time.Second) - defer cancel() - req := &rpc.WriteBatchRawRequest{} - - mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) - err = client.WriteBatchRaw(ctx, req) - require.NoError(t, err) - }) - - t.Run("circuit_breaker_rejected_not_in_shadow_mode", func(t *testing.T) { - config := Config{ - Enabled: true, - ShadowMode: false, - CircuitBreakerConfig: circuitbreaker.Config{ - WindowSize: 15, - BucketDuration: time.Second, - FailureRatio: 0.1, - MinimumRequests: 1, + tests := []struct { + name string + params Params + mockBehavior func(*rpc.MockTChanNode) + expectedError bool + }{ + { + name: "shadow mode enabled - request goes through", + params: Params{ + Config: newTestConfig(true, true), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil) }, - } - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - ctx, cancel := thrift.NewContext(time.Second) - defer cancel() - req := &rpc.WriteBatchRawRequest{} - - // First request should fail and trigger circuit breaker - mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(errors.New("test error")) - err = client.WriteBatchRaw(ctx, req) - require.Error(t, err) - - // Second request should be rejected by circuit breaker - err = client.WriteBatchRaw(ctx, req) - require.Error(t, err) - }) - - t.Run("circuit_breaker_rejected_in_shadow_mode", func(t *testing.T) { - config := newTestConfig(true, true) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - ctx, cancel := thrift.NewContext(time.Second) - defer cancel() - req := &rpc.WriteBatchRawRequest{} - - // First request should fail and trigger circuit breaker - mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(errors.New("test error")) - err = client.WriteBatchRaw(ctx, req) - require.Error(t, err) - - // Second request should still go through in shadow mode - mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) - err = client.WriteBatchRaw(ctx, req) - require.NoError(t, err) - }) - - t.Run("circuit_breaker_success", func(t *testing.T) { - config := newTestConfig(true, false) - middleware, err := New(config, logger, scope, host) - require.NoError(t, err) - - client := middleware(mockNode) - ctx, cancel := thrift.NewContext(time.Second) - defer cancel() - req := &rpc.WriteBatchRawRequest{} - - mockNode.EXPECT().WriteBatchRaw(ctx, req).Return(nil) - err = client.WriteBatchRaw(ctx, req) - require.NoError(t, err) - }) + expectedError: false, + }, + { + name: "shadow mode enabled - request fails", + params: Params{ + Config: newTestConfig(true, true), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) + }, + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + middlewareFn, err := New(tt.params) + require.NoError(t, err) + + mockNode := rpc.NewMockTChanNode(ctrl) + tt.mockBehavior(mockNode) + + client := middlewareFn(mockNode) + ctx, cancel := thrift.NewContext(time.Second) + defer cancel() + + err = client.WriteBatchRaw(ctx, &rpc.WriteBatchRawRequest{}) + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } } From 5f94b7585ce690e4a10521343fe79677bc89f018 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Sun, 25 May 2025 11:01:55 +0530 Subject: [PATCH 21/23] adding more tests --- .../middleware/middleware_test.go | 91 +++++++++++++++++-- 1 file changed, 84 insertions(+), 7 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index be2239ef5f..2c4bd9d3eb 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -126,10 +126,11 @@ func TestClient_WriteBatchRaw(t *testing.T) { name string params Params mockBehavior func(*rpc.MockTChanNode) + verifyState func(*testing.T, *client) expectedError bool }{ { - name: "successful write", + name: "circuit breaker enabled - successful write", params: Params{ Config: newTestConfig(true, false), Logger: zap.NewNop(), @@ -137,12 +138,18 @@ func TestClient_WriteBatchRaw(t *testing.T) { Host: "test-host", }, mockBehavior: func(mockNode *rpc.MockTChanNode) { - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil) + // Expect two successful writes + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) + }, + verifyState: func(t *testing.T, c *client) { + assert.True(t, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Healthy, c.circuit.Status().State()) }, expectedError: false, }, { - name: "failed write", + name: "circuit breaker enabled - failed write transitions to unhealthy", params: Params{ Config: newTestConfig(true, false), Logger: zap.NewNop(), @@ -150,12 +157,20 @@ func TestClient_WriteBatchRaw(t *testing.T) { Host: "test-host", }, mockBehavior: func(mockNode *rpc.MockTChanNode) { + // First request fails mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) + // Second request should be rejected by circuit breaker + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Times(0) + }, + verifyState: func(t *testing.T, c *client) { + assert.True(t, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) }, expectedError: true, }, { - name: "circuit breaker disabled", + name: "circuit breaker disabled - requests pass through", params: Params{ Config: newTestConfig(false, false), Logger: zap.NewNop(), @@ -163,10 +178,46 @@ func TestClient_WriteBatchRaw(t *testing.T) { Host: "test-host", }, mockBehavior: func(mockNode *rpc.MockTChanNode) { - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil) + // Both requests should go through when disabled + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) + }, + verifyState: func(t *testing.T, c *client) { + assert.False(t, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Healthy, c.circuit.Status().State()) }, expectedError: false, }, + { + name: "circuit breaker enabled - unhealthy state rejects requests", + params: Params{ + Config: Config{ + Enabled: true, + ShadowMode: false, + CircuitBreakerConfig: circuitbreaker.Config{ + MinimumRequests: 1, + FailureRatio: 0.1, + WindowSize: 1, + BucketDuration: time.Millisecond, + }, + }, + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + }, + mockBehavior: func(mockNode *rpc.MockTChanNode) { + // First request fails to trigger unhealthy state + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) + // Second request should be rejected by circuit breaker + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Times(0) + }, + verifyState: func(t *testing.T, c *client) { + assert.True(t, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) + }, + expectedError: true, + }, } for _, tt := range tests { @@ -177,13 +228,39 @@ func TestClient_WriteBatchRaw(t *testing.T) { mockNode := rpc.NewMockTChanNode(ctrl) tt.mockBehavior(mockNode) - client := middlewareFn(mockNode) + clientInterface := middlewareFn(mockNode) ctx, cancel := thrift.NewContext(time.Second) defer cancel() - err = client.WriteBatchRaw(ctx, &rpc.WriteBatchRawRequest{}) + // First request to potentially trigger unhealthy state + node, ok := clientInterface.(rpc.TChanNode) + require.True(t, ok, "Client must implement rpc.TChanNode") + err = node.WriteBatchRaw(ctx, &rpc.WriteBatchRawRequest{}) + if tt.expectedError { + assert.Error(t, err) + if err != nil { + assert.Contains(t, err.Error(), "write error") + } + } else { + assert.NoError(t, err) + } + + // Verify circuit breaker state + clientImpl, ok := clientInterface.(*client) + require.True(t, ok, "Client must be of type *client") + tt.verifyState(t, clientImpl) + + // Second request to test circuit breaker behavior + err = node.WriteBatchRaw(ctx, &rpc.WriteBatchRawRequest{}) if tt.expectedError { assert.Error(t, err) + if err != nil { + if clientImpl.circuit.Status().State() == circuitbreaker.Unhealthy { + assert.Contains(t, err.Error(), "circuit breaker") + } else { + assert.Contains(t, err.Error(), "write error") + } + } } else { assert.NoError(t, err) } From f9b98860a2bbcd733d2713adbbfd2e708b49563e Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Sun, 25 May 2025 12:24:46 +0530 Subject: [PATCH 22/23] review change --- .../middleware/middleware_test.go | 98 ++++++++----------- 1 file changed, 39 insertions(+), 59 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 2c4bd9d3eb..47b4635d99 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -51,6 +51,33 @@ func newTestConfig(enabled, shadowMode bool) Config { } } +// newTestParams creates common test parameters +func newTestParams(enabled, shadowMode bool) Params { + return Params{ + Config: newTestConfig(enabled, shadowMode), + Logger: zap.NewNop(), + Scope: tally.NoopScope, + Host: "test-host", + } +} + +// newUnhealthyStateMockBehavior creates mock behavior for unhealthy state +func newUnhealthyStateMockBehavior() func(*rpc.MockTChanNode) { + return func(mockNode *rpc.MockTChanNode) { + // First request fails to trigger unhealthy state + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) + // Second request should be rejected by circuit breaker + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Times(0) + } +} + +// verifyUnhealthyState verifies circuit breaker is in unhealthy state +func verifyUnhealthyState(t *testing.T, c *client) { + assert.True(t, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) +} + func TestNew(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -130,13 +157,8 @@ func TestClient_WriteBatchRaw(t *testing.T) { expectedError bool }{ { - name: "circuit breaker enabled - successful write", - params: Params{ - Config: newTestConfig(true, false), - Logger: zap.NewNop(), - Scope: tally.NoopScope, - Host: "test-host", - }, + name: "circuit breaker enabled - successful write", + params: newTestParams(true, false), mockBehavior: func(mockNode *rpc.MockTChanNode) { // Expect two successful writes mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) @@ -149,34 +171,15 @@ func TestClient_WriteBatchRaw(t *testing.T) { expectedError: false, }, { - name: "circuit breaker enabled - failed write transitions to unhealthy", - params: Params{ - Config: newTestConfig(true, false), - Logger: zap.NewNop(), - Scope: tally.NoopScope, - Host: "test-host", - }, - mockBehavior: func(mockNode *rpc.MockTChanNode) { - // First request fails - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) - // Second request should be rejected by circuit breaker - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Times(0) - }, - verifyState: func(t *testing.T, c *client) { - assert.True(t, c.enabled) - assert.NotNil(t, c.circuit) - assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) - }, + name: "circuit breaker enabled - failed write transitions to unhealthy", + params: newTestParams(true, false), + mockBehavior: newUnhealthyStateMockBehavior(), + verifyState: verifyUnhealthyState, expectedError: true, }, { - name: "circuit breaker disabled - requests pass through", - params: Params{ - Config: newTestConfig(false, false), - Logger: zap.NewNop(), - Scope: tally.NoopScope, - Host: "test-host", - }, + name: "circuit breaker disabled - requests pass through", + params: newTestParams(false, false), mockBehavior: func(mockNode *rpc.MockTChanNode) { // Both requests should go through when disabled mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) @@ -189,33 +192,10 @@ func TestClient_WriteBatchRaw(t *testing.T) { expectedError: false, }, { - name: "circuit breaker enabled - unhealthy state rejects requests", - params: Params{ - Config: Config{ - Enabled: true, - ShadowMode: false, - CircuitBreakerConfig: circuitbreaker.Config{ - MinimumRequests: 1, - FailureRatio: 0.1, - WindowSize: 1, - BucketDuration: time.Millisecond, - }, - }, - Logger: zap.NewNop(), - Scope: tally.NoopScope, - Host: "test-host", - }, - mockBehavior: func(mockNode *rpc.MockTChanNode) { - // First request fails to trigger unhealthy state - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(errors.New("write error")) - // Second request should be rejected by circuit breaker - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Times(0) - }, - verifyState: func(t *testing.T, c *client) { - assert.True(t, c.enabled) - assert.NotNil(t, c.circuit) - assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) - }, + name: "circuit breaker enabled - unhealthy state rejects requests", + params: newTestParams(true, false), + mockBehavior: newUnhealthyStateMockBehavior(), + verifyState: verifyUnhealthyState, expectedError: true, }, } From eec1725d7bef147f4c73d339f502f26a60493fe5 Mon Sep 17 00:00:00 2001 From: ggaurav08 <110825097+ggaurav08@users.noreply.github.com> Date: Sun, 25 May 2025 13:59:18 +0530 Subject: [PATCH 23/23] review change --- .../middleware/middleware_test.go | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go index 47b4635d99..c1309ebd5c 100644 --- a/src/dbnode/client/circuitbreaker/middleware/middleware_test.go +++ b/src/dbnode/client/circuitbreaker/middleware/middleware_test.go @@ -52,9 +52,9 @@ func newTestConfig(enabled, shadowMode bool) Config { } // newTestParams creates common test parameters -func newTestParams(enabled, shadowMode bool) Params { +func newTestParams(enabled bool) Params { return Params{ - Config: newTestConfig(enabled, shadowMode), + Config: newTestConfig(enabled, false), // shadowMode is false Logger: zap.NewNop(), Scope: tally.NoopScope, Host: "test-host", @@ -71,6 +71,14 @@ func newUnhealthyStateMockBehavior() func(*rpc.MockTChanNode) { } } +// newSuccessfulWriteMockBehavior creates mock behavior for successful writes +func newSuccessfulWriteMockBehavior() func(*rpc.MockTChanNode) { + return func(mockNode *rpc.MockTChanNode) { + // Expect two successful writes + mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) + } +} + // verifyUnhealthyState verifies circuit breaker is in unhealthy state func verifyUnhealthyState(t *testing.T, c *client) { assert.True(t, c.enabled) @@ -78,6 +86,13 @@ func verifyUnhealthyState(t *testing.T, c *client) { assert.Equal(t, circuitbreaker.Unhealthy, c.circuit.Status().State()) } +// verifyHealthyState verifies circuit breaker is in healthy state +func verifyHealthyState(t *testing.T, c *client, enabled bool) { + assert.Equal(t, enabled, c.enabled) + assert.NotNil(t, c.circuit) + assert.Equal(t, circuitbreaker.Healthy, c.circuit.Status().State()) +} + func TestNew(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -157,43 +172,33 @@ func TestClient_WriteBatchRaw(t *testing.T) { expectedError bool }{ { - name: "circuit breaker enabled - successful write", - params: newTestParams(true, false), - mockBehavior: func(mockNode *rpc.MockTChanNode) { - // Expect two successful writes - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) - }, + name: "circuit breaker enabled - successful write", + params: newTestParams(true), + mockBehavior: newSuccessfulWriteMockBehavior(), verifyState: func(t *testing.T, c *client) { - assert.True(t, c.enabled) - assert.NotNil(t, c.circuit) - assert.Equal(t, circuitbreaker.Healthy, c.circuit.Status().State()) + verifyHealthyState(t, c, true) }, expectedError: false, }, { name: "circuit breaker enabled - failed write transitions to unhealthy", - params: newTestParams(true, false), + params: newTestParams(true), mockBehavior: newUnhealthyStateMockBehavior(), verifyState: verifyUnhealthyState, expectedError: true, }, { - name: "circuit breaker disabled - requests pass through", - params: newTestParams(false, false), - mockBehavior: func(mockNode *rpc.MockTChanNode) { - // Both requests should go through when disabled - mockNode.EXPECT().WriteBatchRaw(gomock.Any(), gomock.Any()).Return(nil).Times(2) - }, + name: "circuit breaker disabled - requests pass through", + params: newTestParams(false), + mockBehavior: newSuccessfulWriteMockBehavior(), verifyState: func(t *testing.T, c *client) { - assert.False(t, c.enabled) - assert.NotNil(t, c.circuit) - assert.Equal(t, circuitbreaker.Healthy, c.circuit.Status().State()) + verifyHealthyState(t, c, false) }, expectedError: false, }, { name: "circuit breaker enabled - unhealthy state rejects requests", - params: newTestParams(true, false), + params: newTestParams(true), mockBehavior: newUnhealthyStateMockBehavior(), verifyState: verifyUnhealthyState, expectedError: true,