-
Notifications
You must be signed in to change notification settings - Fork 591
ingest-storage: Use pooled timeseries slices when deserializing #11393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pkg/storage/ingest/pusher.go
Outdated
@@ -395,6 +397,10 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) { | |||
// PushToStorage ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. | |||
// PushToStorage aborts the request if it encounters an error. | |||
func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { | |||
// We're moving Timeseries into batches, each batch has a fresh timeseries slice. | |||
// We're done with the slice in the request here, the contents will live on in the batch and be freed when the batch is freed. | |||
defer mimirpb.ReuseTimeseriesSliceDangerous(request.Timeseries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consume
is responsible for allocating from the pool, then PushToStorage
frees back to the pool with the assumption that Consume
is the caller. Not super familiar with the details, but can Consume do the freeing for symmetry? Like right after it calls c.pushToStorage(...)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would definitely be cleaner to just free where it was allocated. Let's see if I can get it into that form or if I run into issues.
The trickiest part I see so far is we still have some implementation specific behavior making it hard to factor out. The parallelStoragePusher might collapse N requests into 1 batch, so you have N slices to free in that path, but only 1 slice to free in the sequential path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice one. Not sure if you know it, but there's benchstat which helps display comparisons between benchmarks runs more nicely.
pkg/storage/ingest/pusher.go
Outdated
@@ -395,6 +397,10 @@ func labelAdaptersHash(b []byte, ls []mimirpb.LabelAdapter) ([]byte, uint64) { | |||
// PushToStorage ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. | |||
// PushToStorage aborts the request if it encounters an error. | |||
func (p *parallelStorageShards) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { | |||
// We're moving Timeseries into batches, each batch has a fresh timeseries slice. | |||
// We're done with the slice in the request here, the contents will live on in the batch and be freed when the batch is freed. | |||
defer mimirpb.ReuseTimeseriesSliceDangerous(request.Timeseries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we somehow clear this slice before returning it? Like set all of its elements to a zero value. Otherwise the underlying array still contains the timeseries refs. I get shivers when I think about how we might debug a data corruption due to this this one day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I think in the current code it's still safe, as ReuseSlice does the same thing already. It returns each timeseries to the pool, then returns ts[:0]
to the slice pool without clearing, I imagine the leftover array of ts
still has pointers to the freed timeseries 🤔
But I don't see a good reason to not do this, it's way safer. I think we can clear the slice as we pull the timeseries out of it, let me update the test image with that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the approach a bit.:
- We zero out the each timeseries in the original slice once it's transferred to the batch with no errors.
- We now only re-use the slice if we successfully transferred and cleared every single timeseries in the request. If there are errors/leftovers in the slice it won't be returned to the pool at all. This should make it much safer and less brittle
- Confirmed that
sync.Pool.Get()
removes an item from the pool and the pool doesn't track it, so it's safe to not put a slice back in the pool every now and then, only less efficient. - This had negligible effect on benchmarks (to be fair, the benchmark never hit the error case)
- Confirmed that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right that this would have at least contributed to a future bug; there was a test that relied on inspecting the slice after it had been pushed to storage (thus freed). I didn't see any code paths outside tests that did this, but someone could easily add one in the future.
pkg/storage/ingest/pusher_test.go
Outdated
kcfg.IngestionConcurrencyMax = 2 | ||
kcfg.IngestionConcurrencyEstimatedBytesPerSample = 1 | ||
kcfg.IngestionConcurrencyTargetFlushesPerShard = 1 | ||
kcfg.IngestionConcurrencyBatchSize = 5 | ||
kcfg.IngestionConcurrencyQueueCapacity = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config looks a bit unrealistic. Are the defaults insufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just quick copy pasta from one of the other parallel pusher tests. I probably picked one that was very test specific, I'll switch to something more standard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The odd settings defintely caused the parallel pusher to allocate way more than usual since the values were so low. But, the delta caused by this change is still about the same:
Before:
BenchmarkPusherConsumer/sequential_pusher-16 19382 627954 ns/op 826590 B/op 934 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16 18906 634400 ns/op 827177 B/op 940 allocs/op
After:
BenchmarkPusherConsumer/sequential_pusher-16 30938 396641 ns/op 28835 B/op 463 allocs/op
BenchmarkPusherConsumer/parallel_pusher-16 28466 418158 ns/op 29447 B/op 469 allocs/op
Benchstat (nice suggestion!)
│ old.txt │ new.txt │
│ sec/op │ sec/op vs base │
PusherConsumer/sequential_pusher-16 616.5µ ± 1% 405.2µ ± 4% -34.28% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16 629.3µ ± 1% 414.2µ ± 1% -34.17% (p=0.000 n=10)
geomean 622.9µ 409.7µ -34.23%
│ old.txt │ new.txt │
│ B/op │ B/op vs base │
PusherConsumer/sequential_pusher-16 807.32Ki ± 0% 28.20Ki ± 0% -96.51% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16 807.87Ki ± 0% 28.78Ki ± 0% -96.44% (p=0.000 n=10)
geomean 807.6Ki 28.49Ki -96.47%
│ old.txt │ new.txt │
│ allocs/op │ allocs/op vs base │
PusherConsumer/sequential_pusher-16 934.0 ± 0% 463.0 ± 0% -50.43% (p=0.000 n=10)
PusherConsumer/parallel_pusher-16 940.0 ± 0% 469.0 ± 0% -50.11% (p=0.000 n=10)
geomean 937.0 466.0 -50.27%
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a massive gain; curious to see how it fares in a real cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8eea5e7
to
260ebf9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While reviewing this code, I think we have a race condition. When timeseries slice is released to the pool in the PushToStorage()
, we break any read to the write request happening after that. For example, look here:
mimir/pkg/storage/ingest/pusher.go
Line 468 in f266d6b
p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds()) |
We access the WriteRequest
but at that point the timeseries slice is already released to the pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback. I left another comment about nil-ify WriteRequest
to have a panic in case of bugs rather than data corruption (which is much harder to debug). I still think that we should rename all PushToStorage()
to PushToStorageAndReleaseWriteRequest()
to clarify that the write request after that call is not safe anymore and shouldn't be used. I know it's just a naming thing, but it could give a great hint to the next engineer that will touch this code.
@@ -460,12 +466,13 @@ func (p *parallelStorageShards) run(queue *batchingQueue) { | |||
p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds()) | |||
p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) | |||
processingStart := time.Now() | |||
requestContents := requestContents(wr.WriteRequest) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we do wr.WriteRequest = nil
here? Given it will be unsafe to use, better a panic rather than data corruption, if in the future we'll try to read it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same in pusherConsumer.Consume()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cleared it at the spot where it's freed (it feels like a logical extension of freeing). The rename makes it clear at the call site too. Is that enough? 🤔
f670e7f
to
f355778
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Just writing it down somewhere, this is probably the most relevant place. While testing an image with this change in dev, a continuous test failure happened. I do not think it's related to this change.
The delay in sample corruption correlates with the WAL issues, that ingester was way behind for a long time. I would expect if this PR broke anything, that the continuous test would immediately fail on the sample that just got written, not have a several hour delay between affected samples and the test failing. Thus, I don't think it's related. |
What this PR does
The pusherConsumer deserializes directly into a
&mimirpb.WriteRequest{}
by calling the generatedUnmarshal
. This path will fetch invidiual PreallocTimeseries from the pool, but not the timeseries slice itself, which is subject to the usual extra allocations when appending.This PR uses a slice from the pool in addition to the contained objects.
There are 2 possible paths after we take from the pool, depending on configuration.
The main purpose of this PR is more of a refactor than an optimization. The block builder already uses a timeseries slice from the pool here. By aligning what we fetch from pools, I can move toward using the same deserialization logic in both services, which I'd like to do for record format V2. The reduced allocations are just a bonus.
TL;DR fewer allocations, supports a future change
Before:
After:
Which issue(s) this PR fixes or relates to
Rel https://github.com/grafana/mimir-squad/issues/2253
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.