8000 ingest-storage: Use pooled timeseries slices when deserializing by alexweav · Pull Request #11393 · grafana/mimir · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

ingest-storage: Use pooled timeseries slices when deserializing #11393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 19, 2025

Conversation

alexweav
Copy link
Contributor
@alexweav alexweav commented May 5, 2025

What this PR does

The pusherConsumer deserializes directly into a &mimirpb.WriteRequest{} by calling the generated Unmarshal. This path will fetch invidiual PreallocTimeseries from the pool, but not the timeseries slice itself, which is subject to the usual extra allocations when appending.

This PR uses a slice from the pool in addition to the contained objects.

There are 2 possible paths after we take from the pool, depending on configuration.

  • The sequentialStoragePusher, which forwards the request straight to the ingester. The ingester frees the slice and its contents. Our contents came from the pool but prior to this PR the slice did not, so we were putting the externally created slice back into the sync.Pool on each request. sync.Pool has an internal safeguard against it growing infinitely by periodically releasing things to GC, so it doesn't leak, but now we can use it for its intended purpose and reduce churn.
  • The parallelStoragePusher, which rearranges the timeseries into batches. The batch slice is already fetched from the pool. That means we can free our serialization slice (but not the contents!) when we're done rearranging it. The ingester will free the batch slice and all the timeseries as usual.

The main purpose of this PR is more of a refactor than an optimization. The block builder already uses a timeseries slice from the pool here. By aligning what we fetch from pools, I can move toward using the same deserialization logic in both services, which I'd like to do for record format V2. The reduced allocations are just a bonus.

TL;DR fewer allocations, supports a future change

go test ./pkg/storage/ingest/... -run $^ -bench BenchmarkPusherConsumer -benchtime=10s -benchmem

Before:

BenchmarkPusherConsumer/sequential_pusher-16         	   20348	    592474 ns/op	  826381 B/op	     933 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16           	    7610	   1519643 ns/op	 1020897 B/op	    1952 allocs/op

After:

BenchmarkPusherConsumer/sequential_pusher-16         	   31896	    385991 ns/op	   28848 B/op	     463 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16           	    9435	   1246078 ns/op	  208315 B/op	    1502 allocs/op

Which issue(s) this PR fixes or relates to

Rel https://github.com/grafana/mimir-squad/issues/2253

Checklist

  • Tests updated.
  • Documentation added.
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX].
  • about-versioning.md updated with experimental features.

@alexweav alexweav marked this pull request as ready for review May 5, 2025 23:03
@alexweav alexweav requested a review from a team as a code owner May 5, 2025 23:03
@@ -395,6 +397,10 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) {
// PushToStorage ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester.
// PushToStorage aborts the request if it encounters an error.
func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error {
// We're moving Timeseries into batches, each batch has a fresh timeseries slice.
// We're done with the slice in the request here, the contents will live on in the batch and be freed when the batch is freed.
defer mimirpb.ReuseTimeseriesSliceDangerous(request.Timeseries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consume is responsible for allocating from the pool, then PushToStorage frees back to the pool with the assumption that Consume is the caller. Not super familiar with the details, but can Consume do the freeing for symmetry? Like right after it calls c.pushToStorage(...).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would definitely be cleaner to just free where it was allocated. Let's see if I can get it into that form or if I run into issues.

The trickiest part I see so far is we still have some implementation specific behavior making it hard to factor out. The parallelStoragePusher might collapse N requests into 1 batch, so you have N slices to free in that path, but only 1 slice to free in the sequential path.

Copy link
Contributor
@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one. Not sure if you know it, but there's benchstat which helps display comparisons between benchmarks runs more nicely.

@@ -395,6 +397,10 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) {
// PushToStorage ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester.
// PushToStorage aborts the request if it encounters an error.
func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error {
// We're moving Timeseries into batches, each batch has a fresh timeseries slice.
// We're done with the slice in the request here, the contents will live on in the batch and be freed when the batch is freed.
defer mimirpb.ReuseTimeseriesSliceDangerous(request.Timeseries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we somehow clear this slice before returning it? Like set all of its elements to a zero value. Otherwise the underlying array still contains the timeseries refs. I get shivers when I think about how we might debug a data corruption due to this this one day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I think in the current code it's still safe, as ReuseSlice does the same thing already. It returns each timeseries to the pool, then returns ts[:0] to the slice pool without clearing, I imagine the leftover array of ts still has pointers to the freed timeseries 🤔

But I don't see a good reason to not do this, it's way safer. I think we can clear the slice as we pull the timeseries out of it, let me update the test image with that change.

Copy link
Contributor Author
@alexweav alexweav May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the approach a bit.:

  • We zero out the each timeseries in the original slice once it's transferred to the batch with no errors.
  • We now only re-use the slice if we successfully transferred and cleared every single timeseries in the request. If there are errors/leftovers in the slice it won't be returned to the pool at all. This should make it much safer and less brittle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that this would have at least contributed to a future bug; there was a test that relied on inspecting the slice after it had been pushed to storage (thus freed). I didn't see any code paths outside tests that did this, but someone could easily add one in the future.

Comment on lines 1590 to 1594
kcfg.IngestionConcurrencyMax = 2
kcfg.IngestionConcurrencyEstimatedBytesPerSample = 1
kcfg.IngestionConcurrencyTargetFlushesPerShard = 1
kcfg.IngestionConcurrencyBatchSize = 5
kcfg.IngestionConcurrencyQueueCapacity = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config looks a bit unrealistic. Are the defaults insufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just quick copy pasta from one of the other parallel pusher tests. I probably picked one that was very test specific, I'll switch to something more standard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The odd settings defintely caused the parallel pusher to allocate way more than usual since the values were so low. But, the delta caused by this change is still about the same:
Before:

BenchmarkPusherConsumer/sequential_pusher-16         	   19382	    627954 ns/op	  826590 B/op	     934 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16           	   18906	    634400 ns/op	  827177 B/op	     940 allocs/op

After:

BenchmarkPusherConsumer/sequential_pusher-16         	   30938	    396641 ns/op	   28835 B/op	     463 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16           	   28466	    418158 ns/op	   29447 B/op	     469 allocs/op

Benchstat (nice suggestion!)

                                    │   old.txt   │               new.txt               │
                                    │   sec/op    │   sec/op     vs base                │
PusherConsumer/sequential_pusher-16   616.5µ ± 1%   405.2µ ± 4%  -34.28% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16     629.3µ ± 1%   414.2µ ± 1%  -34.17% (p=0.000 n=10)
geomean                               622.9µ        409.7µ       -34.23%

                                    │    old.txt    │               new.txt                │
                                    │     B/op      │     B/op      vs base                │
PusherConsumer/sequential_pusher-16   807.32Ki ± 0%   28.20Ki ± 0%  -96.51% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16     807.87Ki ± 0%   28.78Ki ± 0%  -96.44% (p=0.000 n=10)
geomean                                807.6Ki        28.49Ki       -96.47%

                                    │  old.txt   │              new.txt               │
                                    │ allocs/op  │ allocs/op   vs base                │
PusherConsumer/sequential_pusher-16   934.0 ± 0%   463.0 ± 0%  -50.43% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16     940.0 ± 0%   469.0 ± 0%  -50.11% (p=0.000 n=10)
geomean                               937.0        466.0       -50.27%

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a massive gain; curious to see how it fares in a real cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexweav alexweav force-pushed the alexweav/ingest-storage-deser-allocate branch from 8eea5e7 to 260ebf9 Compare May 7, 2025 19:38
Copy link
Collaborator
@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While reviewing this code, I think we have a race condition. When timeseries slice is released to the pool in the PushToStorage(), we break any read to the write request happening after that. For example, look here:

p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())

We access the WriteRequest but at that point the timeseries slice is already released to the pool.

Copy link
Collaborator
@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my feedback. I left another comment about nil-ify WriteRequest to have a panic in case of bugs rather than data corruption (which is much harder to debug). I still think that we should rename all PushToStorage() to PushToStorageAndReleaseWriteRequest() to clarify that the write request after that call is not safe anymore and shouldn't be used. I know it's just a naming thing, but it could give a great hint to the next engineer that will touch this code.

@@ -460,12 +466,13 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {
p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds())
p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries)))
processingStart := time.Now()
requestContents := requestContents(wr.WriteRequest)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we do wr.WriteRequest = nil here? Given it will be unsafe to use, better a panic rather than data corruption, if in the future we'll try to read it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in pusherConsumer.Consume().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleared it at the spot where it's freed (it feels like a logical extension of freeing). The rename makes it clear at the call site too. Is that enough? 🤔

@alexweav alexweav force-pushed the alexweav/ingest-storage-deser-allocate branch from f670e7f to f355778 Compare May 9, 2025 21:11
Copy link
Collaborator
@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@alexweav alexweav merged commit 3f2be4d into main May 19, 2025
30 checks passed
@alexweav alexweav deleted the alexweav/ingest-storage-deser-allocate branch May 19, 2025 14:08
@alexweav
Copy link
Contributor Author

Just writing it down somewhere, this is probably the most relevant place.

While testing an image with this change in dev, a continuous test failure happened. I do not think it's related to this change.

  • the failure (05/09): explore
    • you can see it failed frequently before the rollout too.
  • An ingester was stuck terminating for days before I deployed. (zone-b-2)
  • That ingester logs WAL corruption consistently: explore
  • first failure at 02:21 UTC, for samples at 12:58. explore

The delay in sample corruption correlates with the WAL issues, that ingester was way behind for a long time. I would expect if this PR broke anything, that the continuous test would immediately fail on the sample that just got written, not have a several hour delay between affected samples and the test failing.

Thus, I don't think it's related.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0