-
Notifications
You must be signed in to change notification settings - Fork 464
Adding circuit breaker middleware #4336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ggaurav08
wants to merge
27
commits into
circuit_breaker_library
Choose a base branch
from
circuit_breaker_middleware
base: circuit_breaker_library
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
4ce3eaf
Adding circuit breaker middleware
ggaurav08 db274fd
Merge branch 'circuit_breaker_library' into circuit_breaker_middleware
ggaurav08 cca71b7
review changes
ggaurav08 a33d8bb
review change
ggaurav08 f7b6edf
Merge branch 'circuit_breaker_library' into circuit_breaker_middleware
ggaurav08 feacf9e
review change
ggaurav08 96bbc2e
review change
ggaurav08 af5b673
review change
ggaurav08 ffb66bc
review change
ggaurav08 d35095c
review change
ggaurav08 e1b9ed1
review change
ggaurav08 e921354
Adding unit tests
ggaurav08 e3d468a
Adding shadow reject metrics
ggaurav08 2e36aea
updating packages
ggaurav08 066c1af
Merge branch 'circuit_breaker_library' into circuit_breaker_middleware
ggaurav08 55d6b4b
review change
ggaurav08 923042d
Merge branch 'circuit_breaker_library' into circuit_breaker_middleware
ggaurav08 e0c3a6a
lint fix
ggaurav08 fd8e7a6
review change
ggaurav08 b05e680
review change
ggaurav08 01f7714
review change
ggaurav08 860f687
review change
ggaurav08 2564876
review change
ggaurav08 ef71454
review change
ggaurav08 5f94b75
adding more tests
ggaurav08 f9b9886
review change
ggaurav08 eec1725
review change
ggaurav08 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
8000
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
28 changes: 28 additions & 0 deletions
28
src/dbnode/client/circuitbreaker/circuitbreakererror/error.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package circuitbreakererror | ||
|
||
import ( | ||
"strings" | ||
|
||
xerrors "github.com/m3db/m3/src/x/errors" | ||
) | ||
|
||
// circuitBreakerError is an error type that indicates a circuit breaker error. | ||
type circuitBreakerError struct { | ||
host string | ||
} | ||
|
||
var _ error = (*circuitBreakerError)(nil) | ||
|
||
// New creates a new circuit breaker error with the given host. | ||
func New(host string) error { | ||
err := circuitBreakerError{host: host} | ||
return xerrors.NewNonRetryableError(err) | ||
} | ||
|
||
// Error returns the error message for the circuit breaker error. | ||
func (e circuitBreakerError) Error() string { | ||
var b strings.Builder | ||
b.WriteString("request rejected by circuit breaker of outbound-service: ") | ||
b.WriteString(e.host) | ||
return b.String() | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package middleware | ||
|
||
import ( | ||
"github.com/m3db/m3/src/dbnode/client/circuitbreaker" | ||
) | ||
|
||
// Config represents the configuration for the circuit breaker middleware. | ||
type Config struct { | ||
Enabled bool `yaml:"enabled"` | ||
ShadowMode bool `yaml:"shadowMode"` | ||
CircuitBreakerConfig circuitbreaker.Config `yaml:"circuitBreakerConfig"` | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package middleware | ||
|
||
import ( | ||
"github.com/uber-go/tally" | ||
) | ||
|
||
type circuitBreakerMetrics struct { | ||
rejects tally.Counter | ||
shadowRejects tally.Counter | ||
successes tally.Counter | ||
failures tally.Counter | ||
} | ||
|
||
func newMetrics(scope tally.Scope, host string) *circuitBreakerMetrics { | ||
return &circuitBreakerMetrics{ | ||
successes: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_successes"), | ||
failures: scope.Tagged(map[string]string{"host": host}).Counter("circuit_breaker_failures"), | ||
rejects: scope.Tagged(map[string]string{"host": host, "mode": "live"}).Counter("circuit_breaker_rejects"), | ||
shadowRejects: scope.Tagged(map[string]string{"host": host, "mode": "shadow"}).Counter("circuit_breaker_rejects"), | ||
} | ||
} |
266 changes: 266 additions & 0 deletions
266
src/dbnode/client/circuitbreaker/middleware/middleware.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,266 @@ | ||
package middleware | ||
|
||
import ( | ||
"github.com/uber-go/tally" | ||
"github.com/uber/tchannel-go/thrift" | ||
"go.uber.org/zap" | ||
|
||
"github.com/m3db/m3/src/dbnode/client/circuitbreaker" | ||
"github.com/m3db/m3/src/dbnode/client/circuitbreaker/circuitbreakererror" | ||
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc" | ||
) | ||
|
||
// client is a client that wraps a TChannel client with a circuit breaker. | ||
type client struct { | ||
enabled bool | ||
shadowMode bool | ||
logger *zap.Logger | ||
circuit *circuitbreaker.Circuit | ||
metrics *circuitBreakerMetrics | ||
host string | ||
next rpc.TChanNode | ||
} | ||
|
||
// M3DBMiddleware is a function that takes a TChannel client and returns a circuit breaker client interface. | ||
type M3DBMiddleware func(rpc.TChanNode) Client | ||
|
||
// Client defines the interface for a circuit breaker client. | ||
type Client interface { | ||
rpc.TChanNode | ||
} | ||
|
||
// Params contains all parameters needed to create a new middleware | ||
type Params struct { | ||
Config Config | ||
Logger *zap.Logger | ||
Scope tally.Scope | ||
Host string | ||
} | ||
|
||
// New creates a new circuit breaker middleware. | ||
func New(params Params) (M3DBMiddleware, error) { | ||
c, err := circuitbreaker.NewCircuit(params.Config.CircuitBreakerConfig) | ||
if err != nil { | ||
params.Logger.Warn("failed to create circuit breaker", zap.Error(err)) | ||
return nil, err | ||
} | ||
|
||
return func(next rpc.TChanNode) Client { | ||
return &client{ | ||
enabled: params.Config.Enabled, | ||
shadowMode: params.Config.ShadowMode, | ||
next: next, | ||
logger: params.Logger, | ||
host: params.Host, | ||
metrics: newMetrics(params.Scope, params.Host), | ||
circuit: c, | ||
} | ||
}, nil | ||
} | ||
|
||
// withBreaker executes the given call with a circuit breaker if enabled. | ||
func withBreaker[T any](c *client, ctx thrift.Context, req T, call func(thrift.Context, T) error) error { | ||
// Early return if circuit breaker is disabled or not initialized | ||
if !c.enabled || c.circuit == nil { | ||
return call(ctx, req) | ||
} | ||
|
||
// Check if request is allowed | ||
isAllowed := c.circuit.IsRequestAllowed() | ||
|
||
// If request is not allowed, log and return error | ||
if !isAllowed { | ||
if c.shadowMode { | ||
c.metrics.shadowRejects.Inc(1) | ||
} else { | ||
c.metrics.rejects.Inc(1) | ||
return circuitbreakererror.New(c.host) | ||
} | ||
} | ||
|
||
// Execute the request and update metrics | ||
err := call(ctx, req) | ||
if err == nil { | ||
c.metrics.successes.Inc(1) | ||
} else { | ||
c.metrics.failures.Inc(1) | ||
} | ||
|
||
// Report request status to circuit breaker | ||
if isAllowed { | ||
c.circuit.ReportRequestStatus(err == nil) | ||
} | ||
return err | ||
} | ||
|
||
// WriteBatchRaw is a method that writes a batch of raw data. | ||
func (c *client) WriteBatchRaw(ctx thrift.Context, req *rpc.WriteBatchRawRequest) error { | ||
return withBreaker(c, ctx, req, c.next.WriteBatchRaw) | ||
} | ||
|
||
// Forward all other TChanNode methods to the underlying client | ||
func (c *client) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { | ||
return c.next.Aggregate(ctx, req) | ||
} | ||
|
||
func (c *client) AggregateRaw( | ||
ctx thrift.Context, | ||
req *rpc.AggregateQueryRawRequest, | ||
) (*rpc.AggregateQueryRawResult_, error) { | ||
return c.next.AggregateRaw(ctx, req) | ||
} | ||
|
||
func (c *client) AggregateTiles( | ||
ctx thrift.Context, | ||
req *rpc.AggregateTilesRequest, | ||
) (*rpc.AggregateTilesResult_, error) { | ||
return c.next.AggregateTiles(ctx, req) | ||
} | ||
|
||
func (c *client) Bootstrapped(ctx thrift.Context) (*rpc.NodeBootstrappedResult_, error) { | ||
return c.next.Bootstrapped(ctx) | ||
} | ||
|
||
func (c *client) BootstrappedInPlacementOrNoPlacement( | ||
ctx thrift.Context, | ||
) (*rpc.NodeBootstrappedInPlacementOrNoPlacementResult_, error) { | ||
return c.next.BootstrappedInPlacementOrNoPlacement(ctx) | ||
} | ||
|
||
func (c *client) DebugIndexMemorySegments( | ||
ctx thrift.Context, | ||
req *rpc.DebugIndexMemorySegmentsRequest, | ||
) (*rpc.DebugIndexMemorySegmentsResult_, error) { | ||
return c.next.DebugIndexMemorySegments(ctx, req) | ||
} | ||
|
||
func (c *client) DebugProfileStart( | ||
ctx thrift.Context, | ||
req *rpc.DebugProfileStartRequest, | ||
) (*rpc.DebugProfileStartResult_, error) { | ||
return c.next.DebugProfileStart(ctx, req) | ||
} | ||
|
||
func (c *client) DebugProfileStop( | ||
ctx thrift.Context, | ||
req *rpc.DebugProfileStopRequest, | ||
) (*rpc.DebugProfileStopResult_, error) { | ||
return c.next.DebugProfileStop(ctx, req) | ||
} | ||
|
||
func (c *client) Fetch(ctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { | ||
return c.next.Fetch(ctx, req) | ||
} | ||
|
||
func (c *client) FetchBatchRaw(ctx thrift.Context, req *rpc.FetchBatchRawRequest) (*rpc.FetchBatchRawResult_, error) { | ||
return c.next.FetchBatchRaw(ctx, req) | ||
} | ||
|
||
func (c *client) FetchBatchRawV2( | ||
ctx thrift.Context, | ||
req *rpc.FetchBatchRawV2Request, | ||
) (*rpc.FetchBatchRawResult_, error) { | ||
return c.next.FetchBatchRawV2(ctx, req) | ||
} | ||
|
||
func (c *client) FetchBlocksMetadataRawV2( | ||
ctx thrift.Context, | ||
req *rpc.FetchBlocksMetadataRawV2Request, | ||
) (*rpc.FetchBlocksMetadataRawV2Result_, error) { | ||
return c.next.FetchBlocksMetadataRawV2(ctx, req) | ||
} | ||
|
||
func (c *client) FetchBlocksRaw( | ||
ctx thrift.Context, | ||
req *rpc.FetchBlocksRawRequest, | ||
) (*rpc.FetchBlocksRawResult_, error) { | ||
return c.next.FetchBlocksRaw(ctx, req) | ||
} | ||
|
||
func (c *client) FetchTagged(ctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { | ||
return c.next.FetchTagged(ctx, req) | ||
} | ||
|
||
func (c *client) GetPersistRateLimit(ctx thrift.Context) (*rpc.NodePersistRateLimitResult_, error) { | ||
return c.next.GetPersistRateLimit(ctx) | ||
} | ||
|
||
func (c *client) GetWriteNewSeriesAsync(ctx thrift.Context) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { | ||
return c.next.GetWriteNewSeriesAsync(ctx) | ||
} | ||
|
||
func (c *client) GetWriteNewSeriesBackoffDuration( | ||
ctx thrift.Context, | ||
) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { | ||
return c.next.GetWriteNewSeriesBackoffDuration(ctx) | ||
} | ||
|
||
func (c *client) GetWriteNewSeriesLimitPerShardPerSecond( | ||
ctx thrift.Context, | ||
) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { | ||
return c.next.GetWriteNewSeriesLimitPerShardPerSecond(ctx) | ||
} | ||
|
||
func (c *client) Health(ctx thrift.Context) (*rpc.NodeHealthResult_, error) { | ||
return c.next.Health(ctx) | ||
} | ||
|
||
func (c *client) Query(ctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryResult_, error) { | ||
return c.next.Query(ctx, req) | ||
} | ||
|
||
func (c *client) Repair(ctx thrift.Context) error { | ||
return c.next.Repair(ctx) | ||
} | ||
|
||
func (c *client) SetPersistRateLimit( | ||
ctx thrift.Context, | ||
req *rpc.NodeSetPersistRateLimitRequest, | ||
) (*rpc.NodePersistRateLimitResult_, error) { | ||
return c.next.SetPersistRateLimit(ctx, req) | ||
} | ||
|
||
func (c *client) SetWriteNewSeriesAsync( | ||
ctx thrift.Context, | ||
req *rpc.NodeSetWriteNewSeriesAsyncRequest, | ||
) (*rpc.NodeWriteNewSeriesAsyncResult_, error) { | ||
return c.next.SetWriteNewSeriesAsync(ctx, req) | ||
} | ||
|
||
func (c *client) SetWriteNewSeriesBackoffDuration( | ||
ctx thrift.Context, | ||
req *rpc.NodeSetWriteNewSeriesBackoffDurationRequest, | ||
) (*rpc.NodeWriteNewSeriesBackoffDurationResult_, error) { | ||
return c.next.SetWriteNewSeriesBackoffDuration(ctx, req) | ||
} | ||
|
||
func (c *client) SetWriteNewSeriesLimitPerShardPerSecond( | ||
ctx thrift.Context, | ||
req *rpc.NodeSetWriteNewSeriesLimitPerShardPerSecondRequest, | ||
) (*rpc.NodeWriteNewSeriesLimitPerShardPerSecondResult_, error) { | ||
return c.next.SetWriteNewSeriesLimitPerShardPerSecond(ctx, req) | ||
} | ||
|
||
func (c *client) Truncate(ctx thrift.Context, req *rpc.TruncateRequest) (*rpc.TruncateResult_, error) { | ||
return c.next.Truncate(ctx, req) | ||
} | ||
|
||
func (c *client) Write(ctx thrift.Context, req *rpc.WriteRequest) error { | ||
return c.next.Write(ctx, req) | ||
} | ||
|
||
func (c *client) WriteBatchRawV2(ctx thrift.Context, req *rpc.WriteBatchRawV2Request) error { | ||
return c.next.WriteBatchRawV2(ctx, req) | ||
} | ||
|
||
func (c *client) WriteTagged(ctx thrift.Context, req *rpc.WriteTaggedRequest) error { | ||
return c.next.WriteTagged(ctx, req) | ||
} | ||
|
||
func (c *client) WriteTaggedBatchRaw(ctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { | ||
return c.next.WriteTaggedBatchRaw(ctx, req) | ||
} | ||
|
||
func (c *client) WriteTaggedBatchRawV2(ctx thrift.Context, req *rpc.WriteTaggedBatchRawV2Request) error { | ||
return c.next.WriteTaggedBatchRawV2(ctx, req) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this src host or dest host? are you concerned about the cardinality of this at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dest host. It is primarily used to identify which node is experiencing issues in the event of a request failure. In cases where a node is slow, this helps pinpoint the problematic node so the server can be restarted accordingly.