8000 Adding circuit breaker middleware by ggaurav08 · Pull Request #4336 · m3db/m3 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Adding circuit breaker middleware #4336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: circuit_breaker_library
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
8000 Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/dbnode/client/circuitbreaker/circuitbreakererror/error.go
8000
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package circuitbreakererror

import (
"strings"

xerrors "github.com/m3db/m3/src/x/errors"
)

// circuitBreakerError is an error type that indicates a circuit breaker error.
type circuitBreakerError struct {
host string
}

var _ error = (*circuitBreakerError)(nil)

// New creates a new circuit breaker error with the given host.
func New(host string) error {
err := circuitBreakerError{host: host}
return xerrors.NewNonRetryableError(err)
}

// Error returns the error message for the circuit breaker error.
func (e circuitBreakerError) Error() string {
var b strings.Builder
b.WriteString("request rejected by circuit breaker of outbound-service: ")
b.WriteString(e.host)
return b.String()
}
12 changes: 12 additions & 0 deletions src/dbnode/client/circuitbreaker/middleware/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package middleware

import (
"github.com/m3db/m3/src/dbnode/client/circuitbreaker"
)

// Config represents the configuration for the circuit breaker middleware.
type Config struct {
Enabled bool `yaml:"enabled"`
ShadowMode bool `yaml:"shadowMode"`
CircuitBreakerConfig circuitbreaker.Config `yaml:"circuitBreakerConfig"`
}
21 changes: 21 additions & 0 deletions src/dbnode/client/circuitbreaker/middleware/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package middleware

import (
"github.com/uber-go/tally"
)

type circuitBreakerMetrics struct {
rejects tally.Counter
shadowRejects tally.Counter
successes tally.Counter
failures tally.Counter
}

func newMetrics(scope tally.Scope, host string) *circuitBreakerMetrics {
return &circuitBreakerMetrics{
successes: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_successes"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this src host or dest host? are you concerned about the cardinality of this at all?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dest host. It is primarily used to identify which node is experiencing issues in the event of a request failure. In cases where a node is slow, this helps pinpoint the problematic node so the server can be restarted accordingly.

failures: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_failures"),
rejects: scope.Tagged(map[string]string{"host": host, "mode": "live"}).Counter("circuit_breaker_rejects"),
shadowRejects: scope.Tagged(map[string]string{"host": host, "mode": "shadow"}).Counter("circuit_breaker_rejects"),
}
}
266 changes: 266 additions & 0 deletions src/dbnode/client/circuitbreaker/middleware/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package middleware

import (
"github.com/uber-go/tally"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"

"github.com/m3db/m3/src/dbnode/client/circuitbreaker"
"github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror"
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
)

// client is a client that wraps a TChannel client with a circuit breaker.
type client struct {
enabled bool
shadowMode bool
logger *zap.Logger
circuit *circuitbreaker.Circuit
metrics *circuitBreakerMetrics
host string
next rpc.TChanNode
}

// M3DBMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface.
type M3DBMiddleware func(rpc.TChanNode) Client

// Client defines the interface for a circuit breaker client.
type Client interface {
rpc.TChanNode
}

// Params contains all parameters needed to create a new middleware
type Params struct {
Config Config
Logger *zap.Logger
Scope tally.Scope
Host string
}

// New creates a new circuit breaker middleware.
func New(params Params) (M3DBMiddleware, error) {
c, err := circuitbreaker.NewCircuit(params.Config.CircuitBreakerConfig)
if err != nil {
params.Logger.Warn("failed to create circuit breaker", zap.Error(err))
return nil, err
}

return func(next rpc.TChanNode) Client {
return &client{
enabled: params.Config.Enabled,
shadowMode: params.Config.ShadowMode,
next: next,
logger: params.Logger,
host: params.Host,
metrics: newMetrics(params.Scope, params.Host),
circuit: c,
}
}, nil
}

// withBreaker executes the given call with a circuit breaker if enabled.
func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) error) error {
// Early return if circuit breaker is disabled or not initialized
if !c.enabled || c.circuit == nil {
return call(ctx, req)
}

// Check if request is allowed
isAllowed := c.circuit.IsRequestAllowed()

// If request is not allowed, log and return error
if !isAllowed {
if c.shadowMode {
c.metrics.shadowRejects.Inc(1)
} else {
c.metrics.rejects.Inc(1)
return circuitbreakererror.New(c.host)
}
}

// Execute the request and update metrics
err := call(ctx, req)
if err == nil {
c.metrics.successes.Inc(1)
} else {
c.metrics.failures.Inc(1)
}

// Report request status to circuit breaker
if isAllowed {
c.circuit.ReportRequestStatus(err == nil)
}
return err
}

// WriteBatchRaw is a method that writes a batch of raw data.
func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error {
return withBreaker(c, ctx, req, c.next.WriteBatchRaw)
}

// Forward all other TChanNode methods to the underlying client
func (c *client) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) {
return c.next.Aggregate(ctx, req)
}

func (c *client) AggregateRaw(
ctx thrift.Context,
req *rpc.AggregateQueryRawRequest,
) (*rpc.AggregateQueryRawResult_, error) {
return c.next.AggregateRaw(ctx, req)
}

func (c *client) AggregateTiles(
ctx thrift.Context,
req *rpc.AggregateTilesRequest,
) (*rpc.AggregateTilesResult_, error) {
return c.next.AggregateTiles(ctx, req)
}

func (c *client) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) {
return c.next.Bootstrapped(ctx)
}

func (c *client) BootstrappedInPlacementOrNoPlacement(
ctx thrift.Context,
) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) {
return c.next.BootstrappedInPlacementOrNoPlacement(ctx)
}

func (c *client) DebugIndexMemorySegments(
ctx thrift.Context,
req *rpc.DebugIndexMemorySegmentsRequest,
) (*rpc.DebugIndexMemorySegmentsResult_, error) {
return c.next.DebugIndexMemorySegments(ctx, req)
}

func (c *client) DebugProfileStart(
ctx thrift.Context,
req *rpc.DebugProfileStartRequest,
) (*rpc.DebugProfileStartResult_, error) {
return c.next.DebugProfileStart(ctx, req)
}

func (c *client) DebugProfileStop(
ctx thrift.Context,
req *rpc.DebugProfileStopRequest,
) (*rpc.DebugProfileStopResult_, error) {
return c.next.DebugProfileStop(ctx, req)
}

func (c *client) Fetch(ctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) {
return c.next.Fetch(ctx, req)
}

func (c *client) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) {
return c.next.FetchBatchRaw(ctx, req)
}

func (c *client) FetchBatchRawV2(
ctx thrift.Context,
req *rpc.FetchBatchRawV2Request,
) (*rpc.FetchBatchRawResult_, error) {
return c.next.FetchBatchRawV2(ctx, req)
}

func (c *client) FetchBlocksMetadataRawV2(
ctx thrift.Context,
req *rpc.FetchBlocksMetadataRawV2Request,
) (*rpc.FetchBlocksMetadataRawV2Result_, error) {
return c.next.FetchBlocksMetadataRawV2(ctx, req)
}

func (c *client) FetchBlocksRaw(
ctx thrift.Context,
req *rpc.FetchBlocksRawRequest,
) (*rpc.FetchBlocksRawResult_, error) {
return c.next.FetchBlocksRaw(ctx, req)
}

func (c *client) FetchTagged(ctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) {
return c.next.FetchTagged(ctx, req)
}

func (c *client) GetPersistRateLimit(ctx thrift.Context) (*rpc.NodePersistRateLimitResult_, error) {
return c.next.GetPersistRateLimit(ctx)
}

func (c *client) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSeriesAsyncResult_, error) {
return c.next.GetWriteNewSeriesAsync(ctx)
}

func (c *client) GetWriteNewSeriesBackoffDuration(
ctx thrift.Context,
) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) {
return c.next.GetWriteNewSeriesBackoffDuration(ctx)
}

func (c *client) GetWriteNewSeriesLimitPerShardPerSecond(
ctx thrift.Context,
) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) {
return c.next.GetWriteNewSeriesLimitPerShardPerSecond(ctx)
}

func (c *client) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) {
return c.next.Health(ctx)
}

func (c *client) Query(ctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) {
return c.next.Query(ctx, req)
}

func (c *client) Repair(ctx thrift.Context) error {
return c.next.Repair(ctx)
}

func (c *client) SetPersistRateLimit(
ctx thrift.Context,
req *rpc.NodeSetPersistRateLimitRequest,
) (*rpc.NodePersistRateLimitResult_, error) {
return c.next.SetPersistRateLimit(ctx, req)
}

func (c *client) SetWriteNewSeriesAsync(
ctx thrift.Context,
req *rpc.NodeSetWriteNewSeriesAsyncRequest,
) (*rpc.NodeWriteNewSeriesAsyncResult_, error) {
return c.next.SetWriteNewSeriesAsync(ctx, req)
}

func (c *client) SetWriteNewSeriesBackoffDuration(
ctx thrift.Context,
req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest,
) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) {
return c.next.SetWriteNewSeriesBackoffDuration(ctx, req)
}

func (c *client) SetWriteNewSeriesLimitPerShardPerSecond(
ctx thrift.Context,
req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest,
) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) {
return c.next.SetWriteNewSeriesLimitPerShardPerSecond(ctx, req)
}

func (c *client) Truncate(ctx thrift.Context, req *rpc.TruncateRequest) (*rpc.TruncateResult_, error) {
return c.next.Truncate(ctx, req)
}

func (c *client) Write(ctx thrift.Context, req *rpc.WriteRequest) error {
return c.next.Write(ctx, req)
}

func (c *client) WriteBatchRawV2(ctx thrift.Context, req *rpc.WriteBatchRawV2Request) error {
return c.next.WriteBatchRawV2(ctx, req)
}

func (c *client) WriteTagged(ctx thrift.Context, req *rpc.WriteTaggedRequest) error {
return c.next.WriteTagged(ctx, req)
}

func (c *client) WriteTaggedBatchRaw(ctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error {
return c.next.WriteTaggedBatchRaw(ctx, req)
}

func (c *client) WriteTaggedBatchRawV2(ctx thrift.Context, req *rpc.WriteTaggedBatchRawV2Request) error {
return c.next.WriteTaggedBatchRawV2(ctx, req)
}
Loading
0