8000 tsdb/agent: allow ingestion of OOO samples by tpaschalis · Pull Request #12897 · prometheus/prometheus · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

tsdb/agent: allow ingestion of OOO samples #12897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,11 +802,6 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series.Lock()
defer series.Unlock()

if t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}

// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: series.ref,
Expand Down Expand Up @@ -930,11 +925,6 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series.Lock()
defer series.Unlock()

if t < series.lastTs {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}

switch {
case h != nil:
// NOTE: always modify pendingHistograms and histogramSeries together
Expand Down
127 changes: 127 additions & 0 deletions tsdb/agent/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,3 +751,130 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, walExemplarsCount)
}

func TestDBAllowOOOSamples(t *testing.T) {
const (
numDatapoints = 5
numHistograms = 5
numSeries = 4
offset = 100
)

reg := prometheus.NewRegistry()
s := createTestAgentDB(t, reg, DefaultOptions())
app := s.Appender(context.TODO())

// Let's add some samples in the [offset, offset+numDatapoints) range.
lbls := labelsForTest(t.Name(), numSeries)
for _, l := range lbls {
lset := labels.New(l...)

for i := offset; i < numDatapoints+offset; i++ {
ref, err := app.Append(0, lset, int64(i), float64(i))
require.NoError(t, err)

e := exemplar.Exemplar{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(t, err)
}
}

lbls = labelsForTest(t.Name()+"_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)

histograms := tsdbutil.GenerateTestHistograms(numHistograms)

for i := offset; i < numDatapoints+offset; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil)
require.NoError(t, err)
}
}

lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
for _, l := range lbls {
lset := labels.New(l...)

floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)

for i := offset; i < numDatapoints+offset; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset])
require.NoError(t, err)
}
}

require.NoError(t, app.Commit())
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(40), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, s.Close())

// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
// We need the original directory so we can recreate the storage for replay.
storageDir := filepath.Dir(s.wal.Dir())

// Replay the storage so that the lastTs for each series is recorded.
reg2 := prometheus.NewRegistry()
db, err := Open(s.logger, reg2, nil, storageDir, s.opts)
if err != nil {
t.Fatalf("unable to create storage for the agent: %v", err)
}

app = db.Appender(context.Background())

// Now the lastTs will have been recorded successfully.
// Let's try appending twice as many OOO samples in the [0, numDatapoints) range.
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)

for i := 0; i < numDatapoints; i++ {
ref, err := app.Append(0, lset, int64(i), float64(i))
require.NoError(t, err)

e := exemplar.Exemplar{
Labels: lset,
Ts: int64(i) * 2,
Value: float64(i),
HasTs: true,
}
_, err = app.AppendExemplar(ref, lset, e)
require.NoError(t, err)
}
}

lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)

histograms := tsdbutil.GenerateTestHistograms(numHistograms)

for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
require.NoError(t, err)
}
}

lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2)
for _, l := range lbls {
lset := labels.New(l...)

floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)

for i := 0; i < numDatapoints; i++ {
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
require.NoError(t, err)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tpaschalis So far I see that we are checking that there are no erros on append but we are not checking that the samples are actually there. Could we test to replay and read that there is what we actually inserted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the thing is that there's no way to access the samples by replaying the WAL (they're only used to recalculate lastTs).

What we can do is append a different number of samples and use the prometheus_agent_samples_appended_total metric to verify that the right number of datapoints was appended. I've implemented this in 242aa57, let me know how it looks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works fine 👍


require.NoError(t, app.Commit())
m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total")
require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, db.Close())
}
0