8000 query-frontend: add retry logic for cardinality, series and remoteReads endpoints by NickAnge · Pull Request #11533 · grafana/mimir · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

query-frontend: add retry logic for cardinality, series and remoteReads endpoints #11533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [ENHANCEMENT] Distributor: Gracefully handle type assertion of WatchPrefix in HA Tracker to continue checking for updates. #11411 #11461
* [ENHANCEMENT] Querier: Include chunks streamed from store-gateway in Mimir Query Engine memory estimate of query memory usage. #11453 #11465
* [ENHANCEMENT] Querier: Include chunks streamed from ingester in Mimir Query Engine memory estimate of query memory usage. #11457
* [ENHANCEMENT] Query-frontend: Add retry mechanism for remote reads, series, and cardinality prometheus endpoints #11533
* [BUGFIX] OTLP: Fix response body and Content-Type header to align with spec. #10852
* [BUGFIX] Compactor: fix issue where block becomes permanently stuck when the Compactor's block cleanup job partially deletes a block. #10888
* [BUGFIX] Storage: fix intermittent failures in S3 upload retries. #10952
Expand Down
97 changes: 68 additions & 29 deletions pkg/frontend/querymiddleware/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package querymiddleware
import (
"context"
"errors"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -20,12 +21,48 @@ import (
"github.com/grafana/mimir/pkg/util/spanlogger"
)

type retryMiddlewareMetrics struct {
type retryOperation[T any] func() (T, error)

func doWithRetries[T any](ctx context.Context, log log.Logger, maxRetries int, metrics prometheus.Observer, operation retryOperation[T]) (T, error) {
tries := 0
defer func() { metrics.Observe(float64(tries)) }()

var lastErr error
var zero T
for ; tries < maxRetries; tries++ {
if ctx.Err() != nil {
return zero, ctx.Err()
}

resp, err : 10000 = operation()
if err == nil {
return resp, nil
}

if apierror.IsNonRetryableAPIError(err) || errors.Is(err, context.Canceled) {
return zero, err
}

// Retry if we get a HTTP 500 or a non-HTTP error.
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
log := util_log.WithContext(ctx, spanlogger.FromContext(ctx, log))
level.Error(log).Log("msg", "error processing request", "try", tries, "err", err)
continue
}

return zero, err
}
return zero, lastErr
}

type retryMetrics struct {
retriesCount prometheus.Histogram
}

func newRetryMiddlewareMetrics(registerer prometheus.Registerer) prometheus.Observer {
return &retryMiddlewareMetrics{
func newRetryMetrics(registerer prometheus.Registerer) prometheus.Observer {
return &retryMetrics{
retriesCount: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_retries",
Help: "Number of times a request is retried.",
Expand All @@ -34,7 +71,7 @@ func newRetryMiddlewareMetrics(registerer prometheus.Registerer) prometheus.Obse
}
}

func (m *retryMiddlewareMetrics) Observe(v float64) {
func (m *retryMetrics) Observe(v float64) {
m.retriesCount.Observe(v)
}

Expand All @@ -50,7 +87,7 @@ type retry struct {
// fail with 500 or a non-HTTP error.
func newRetryMiddleware(log log.Logger, maxRetries int, metrics prometheus.Observer) MetricsQueryMiddleware {
if metrics == nil {
metrics = newRetryMiddlewareMetrics(nil)
metrics = newRetryMetrics(nil)
}

return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
Expand All @@ -64,32 +101,34 @@ func newRetryMiddleware(log log.Logger, maxRetries int, metrics prometheus.Obser
}

func (r retry) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
tries := 0
defer func() { r.metrics.Observe(float64(tries)) }()
return doWithRetries(ctx, r.log, r.maxRetries, r.metrics, func() (Response, error) {
return r.next.Do(ctx, req)
})
}

var lastErr error
for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
resp, err := r.next.Do(ctx, req)
if err == nil {
return resp, nil
}
type retryRoundTripper struct {
next http.RoundTripper
log log.Logger
maxRetries int

if apierror.IsNonRetryableAPIError(err) || errors.Is(err, context.Canceled) {
return nil, err
}
// Retry if we get a HTTP 500 or a non-HTTP error.
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
log := util_log.WithContext(ctx, spanlogger.FromContext(ctx, r.log))
level.Error(log).Log("msg", "error processing request", "try", tries, "err", err)
continue
}
metrics prometheus.Observer
}

return nil, err
func newRetryRoundTripper(next http.RoundTripper, log log.Logger, maxRetries int, metrics prometheus.Observer) http.RoundTripper {
if metrics == nil {
metrics = newRetryMetrics(nil)
}

return retryRoundTripper{
next: next,
log: log,
maxRetries: maxRetries,
metrics: metrics,
}
return nil, lastErr
}

func (r retryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return doWithRetries(req.Context(), r.log, r.maxRetries, r.metrics, func() (*http.Response, error) {
return r.next.RoundTrip(req)
})
}
71 changes: 71 additions & 0 deletions pkg/frontend/querymiddleware/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,74 @@ func Test_RetryMiddlewareCancel(t *testing.T) {
require.Equal(t, int32(1), try.Load())
require.Equal(t, ctx.Err(), err)
}

func Test_RetryRoundTripper(t *testing.T) {
var try atomic.Int32

errBadRequest := httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: http.StatusBadRequest,
Body: []byte("Bad Request"),
})

errInternal := httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte("Internal Server Error"),
})

for _, tc := range []struct {
name string
handler http.RoundTripper
resp *http.Response
err error
expectedRetries int
}{
{
name: "retry failures",
expectedRetries: 4,
handler: RoundTripFunc(func(*http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return &http.Response{StatusCode: http.StatusOK}, nil
}
return nil, fmt.Errorf("fail")
}),
resp: &http.Response{StatusCode: http.StatusOK},
},
{
name: "don't retry 400s",
expectedRetries: 0,
handler: RoundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errBadRequest
}),
err: errBadRequest,
},
{
name: "retry 500s",
expectedRetries: 5,
handler: RoundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errInternal
}),
err: errInternal,
},
{
name: "last error",
expectedRetries: 4,
handler: RoundTripFunc(func(*http.Request) (*http.Response, error) {
if try.Inc() == 5 {
return nil, errBadRequest
}
return nil, errInternal
}),
err: errBadRequest,
},
} {
t.Run(tc.name, func(t *testing.T) {
try.Store(0)
mockMetrics := mockRetryMetrics{}
rt := newRetryRoundTripper(tc.handler, log.NewNopLogger(), 5, &mockMetrics)
resp, err := rt.RoundTrip(&http.Request{})
require.Equal(t, tc.err, err)
require.Equal(t, tc.resp, resp)
require.Equal(t, float64(tc.expectedRetries), mockMetrics.retries)
})
}
}
17 changes: 14 additions & 3 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func newQueryTripperware(
cacheKeyGenerator = NewDefaultCacheKeyGenerator(codec, cfg.SplitQueriesByInterval)
}

retryMetrics := newRetryMetrics(registerer)

queryRangeMiddleware, queryInstantMiddleware, remoteReadMiddleware := newQueryMiddlewares(
cfg,
log,
Expand All @@ -267,6 +269,7 @@ func newQueryTripperware(
engine,
engineOpts.NoStepSubqueryIntervalFn,
registerer,
retryMetrics,
)
requestBlocker := newRequestBlocker(limits, log, registerer)

Expand All @@ -289,6 +292,13 @@ func newQueryTripperware(
labels := next
series := next

if cfg.MaxRetries > 0 {
cardinality = newRetryRoundTripper(cardinality, log, cfg.MaxRetries, retryMetrics)
series = newRetryRoundTripper(series, log, cfg.MaxRetries, retryMetrics)
labels = newRetryRoundTripper(labels, log, cfg.MaxRetries, retryMetrics)
activeSeries = newRetryRoundTripper(series, log, cfg.MaxRetries, retryMetrics)
}

if cfg.ShardActiveSeriesQueries {
activeSeries = newShardActiveSeriesMiddleware(activeSeries, cfg.UseActiveSeriesDecoder, limits, log)
activeNativeHistogramMetrics = newShardActiveNativeHistogramMetricsMiddleware(activeNativeHistogramMetrics, limits, log)
Expand Down Expand Up @@ -364,6 +374,7 @@ func newQueryMiddlewares(
engine *promql.Engine,
defaultStepFunc func(rangeMillis int64) int64,
registerer prometheus.Registerer,
retryMetrics prometheus.Observer,
) (queryRangeMiddleware, queryInstantMiddleware, remoteReadMiddleware []MetricsQueryMiddleware) {
// Metric used to keep track of each middleware execution duration.
metrics := newInstrumentMiddlewareMetrics(registerer)
Expand All @@ -376,7 +387,6 @@ func newQueryMiddlewares(
queryLimiterMiddleware := newQueryLimiterMiddleware(cacheClient, cacheKeyGenerator, limits, log, blockedQueriesCounter)
queryStatsMiddleware := newQueryStatsMiddleware(registerer, engine)
prom2CompatMiddleware := newProm2RangeCompatMiddleware(limits, log, registerer)
retryMiddlewareMetrics := newRetryMiddlewareMetrics(registerer)

remoteReadMiddleware = append(remoteReadMiddleware,
// Track query range statistics. Added first before any subsequent middleware modifies the request.
Expand Down Expand Up @@ -512,8 +522,9 @@ func newQueryMiddlewares(
}

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryInstantMiddleware = append(queryInstantMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryRangeMiddleware = append(queryRangeMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMetrics))
queryInstantMiddleware = append(queryInstantMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMetrics))
remoteReadMiddleware = append(remoteReadMiddleware, newInstrumentMiddleware("retry", metrics), newRetryMiddleware(log, cfg.MaxRetries, retryMetrics))
}

// Does not apply to remote read as those are executed remotely and the enabling of PromQL experimental
Expand Down
6 changes: 3 additions & 3 deletions pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ func TestMiddlewaresConsistency(t *testing.T) {
promql.NewEngine(promql.EngineOpts{}),
defaultStepFunc,
nil,
nil,
)

middlewaresByRequestType := map[string]struct {
Expand All @@ -630,9 +631,7 @@ func TestMiddlewaresConsistency(t *testing.T) {
"remote read": {
instances: remoteReadMiddlewares,
exceptions: []string{
"instrumentMiddleware",
"querySharding", // No query sharding support.
"retry",
"querySharding", // No query sharding support.
"splitAndCacheMiddleware", // No time splitting and results cache support.
"splitInstantQueryByIntervalMiddleware", // Not applicable because specific to instant queries.
"stepAlignMiddleware", // Not applicable because remote read requests don't take step in account when running in Mimir.
Expand Down Expand Up @@ -832,6 +831,7 @@ func TestTripperware_RemoteRead(t *testing.T) {
nil,
reg,
)

require.NoError(t, err)

req := tc.makeRequest()
Expand Down
0