8000 chore: improve error messages and logging during shard opening (#25333) · influxdata/influxdb@cc9bd41 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit cc9bd41

Browse files
authored
chore: improve error messages and logging during shard opening (#25333)
Backport from main-2.x. (cherry picked from commit da9615f) Closes: #25332
1 parent 46086c8 commit cc9bd41

File tree

8 files changed

+48
-31
lines changed

8 files changed

+48
-31
lines changed

tsdb/engine.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,22 +126,24 @@ func NewEngine(id uint64, i Index, path string, walPath string, sfile *SeriesFil
126126
options.OnNewEngine(engine)
127127
}
128128
return engine, nil
129+
} else if err != nil {
130+
return nil, fmt.Errorf("error getting file stats for %q in NewEngine: %w", path, err)
129131
}
130132

131133
// If it's a dir then it's a tsm1 engine
132134
format := DefaultEngine
133135
if fi, err := os.Stat(path); err != nil {
134-
return nil, err
136+
return nil, fmt.Errorf("error calling Stat on %q in NewEngine: %w", path, err)
135137
} else if !fi.Mode().IsDir() {
136-
return nil, ErrUnknownEngineFormat
138+
return nil, fmt.Errorf("error opening %q: %w", path, ErrUnknownEngineFormat)
137139
} else {
138140
format = "tsm1"
139141
}
140142

141143
// Lookup engine by format.
142144
fn := newEngineFuncs[format]
143145
if fn == nil {
144-
return nil, fmt.Errorf("invalid engine format: %q", format)
146+
return nil, fmt.Errorf("invalid engine format for %q: %q", path, format)
145147
}
146148

147149
engine := fn(id, i, path, walPath, sfile, options)

tsdb/engine/tsm1/engine.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -737,9 +737,10 @@ func (e *Engine) Open(ctx context.Context) error {
737737
return err
738738
}
739739

740-
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"), e.logger)
740+
fieldPath := filepath.Join(e.path, "fields.idx")
741+
fields, err := tsdb.NewMeasurementFieldSet(fieldPath, e.logger)
741742
if err != nil {
742-
e.logger.Warn(fmt.Sprintf("error opening fields.idx: %v. Rebuilding.", err))
743+
e.logger.Warn("error opening fields.idx: Rebuilding.", zap.String("path", fieldPath), zap.Error(err))
743744
}
744745

745746
e.mu.Lock()
@@ -750,7 +751,7 @@ func (e *Engine) Open(ctx context.Context) error {
750751

751752
if e.WALEnabled {
752753
if err := e.WAL.Open(); err != nil {
753-
return err
754+
return fmt.Errorf("error opening WAL for %q: %w", fieldPath, err)
754755
}
755756
}
756757

@@ -2289,7 +2290,7 @@ func (e *Engine) reloadCache() error {
22892290
now := time.Now()
22902291
files, err := segmentFileNames(e.WAL.Path())
22912292
if err != nil {
2292-
return err
2293+
return fmt.Errorf("error getting segment file names for %q in Engine.reloadCache: %w", e.WAL.Path(), err)
22932294
}
22942295

22952296
limit := e.Cache.MaxSize()
@@ -2318,15 +2319,16 @@ func (e *Engine) cleanup() error {
23182319
if os.IsNotExist(err) {
23192320
return nil
23202321
} else if err != nil {
2321-
return err
2322+
return fmt.Errorf("error calling ReadDir for %q in Engine.cleanup: %w", e.path, err)
23222323
}
23232324

23242325
ext := fmt.Sprintf(".%s", TmpTSMFileExtension)
23252326
for _, f := range allfiles {
23262327
// Check to see if there are any `.tmp` directories that were left over from failed shard snapshots
23272328
if f.IsDir() && strings.HasSuffix(f.Name(), ext) {
2328-
if err := os.RemoveAll(filepath.Join(e.path, f.Name())); err != nil {
2329-
return fmt.Errorf("error removing tmp snapshot directory %q: %s", f.Name(), err)
2329+
path := filepath.Join(e.path, f.Name())
2330+
if err := os.RemoveAll(path); err != nil {
2331+
return fmt.Errorf("error removing tmp snapshot directory %q in Engine.cleanup: %w", path, err)
23302332
}
23312333
}
23322334
}
@@ -2335,14 +2337,15 @@ func (e *Engine) cleanup() error {
23352337
}
23362338

23372339
func (e *Engine) cleanupTempTSMFiles() error {
2338-
files, err := filepath.Glob(filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension)))
2340+
pattern := filepath.Join(e.path, fmt.Sprintf("*.%s", CompactionTempExtension))
2341+
files, err := filepath.Glob(pattern)
23392342
if err != nil {
2340-
return fmt.Errorf("error getting compaction temp files: %s", err.Error())
2343+
return fmt.Errorf("error getting compaction temp files for %q in Engine.cleanupTempTSMFiles: %w", pattern, err)
23412344
}
234223 F438 45

23432346
for _, f := range files {
23442347
if err := os.Remove(f); err != nil {
2345-
return fmt.Errorf("error removing temp compaction files: %v", err)
2348+
return fmt.Errorf("error removing temp compaction file %q in Engine.cleanupTempTSMFiles: %w", f, err)
23462349
}
23472350
}
23482351
return nil

tsdb/engine/tsm1/file_store.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ func (f *FileStore) Open(ctx context.Context) error {
549549
// find the current max ID for temp directories
550550
tmpfiles, err := os.ReadDir(f.dir)
551551
if err != nil {
552-
return err
552+
return fmt.Errorf("error calling ReadDir on %q in FileStore.Open: %w", f.dir, err)
553553
}
554554

555555
// ascertain the current temp directory number by examining the existing
@@ -575,9 +575,10 @@ func (f *FileStore) Open(ctx context.Context) error {
575575
f.currentTempDirID = i
576576
}
577577

578-
files, err := filepath.Glob(filepath.Join(f.dir, "*."+TSMFileExtension))
578+
pattern := filepath.Join(f.dir, "*."+TSMFileExtension)
579+
files, err := filepath.Glob(pattern)
579580
if err != nil {
580-
return err
581+
return fmt.Errorf("error in Glob for %q in FileStore.Open: %w", pattern, err)
581582
}
582583

583584
// struct to hold the result of opening each reader in a goroutine
@@ -591,7 +592,7 @@ func (f *FileStore) Open(ctx context.Context) error {
591592
// Keep track of the latest ID
592593
generation, _, err := f.parseFileName(fn)
593594
if err != nil {
594-
return err
595+
return fmt.Errorf("error parsing %q in FileStore.Open: %w", fn, err)
595596
}
596597

597598
if generation >= f.currentGeneration {
@@ -600,7 +601,7 @@ func (f *FileStore) Open(ctx context.Context) error {
600601

601602
file, err := os.OpenFile(fn, os.O_RDONLY, 0666)
602603
if err != nil {
603-
return fmt.Errorf("error opening file %s: %v", fn, err)
604+
return fmt.Errorf("error calling OpenFile on %q in FileStore.Open: %w", fn, err)
604605
}
605606

606607
go func(idx int, file *os.File) {
@@ -624,17 +625,20 @@ func (f *FileStore) Open(ctx context.Context) error {
624625
// If we are unable to read a TSM file then log the error, rename
625626
// the file, and continue loading the shard without it.
626627
if err != nil {
628+
if cerr := file.Close(); cerr != nil {
629+
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
630+
}
631+
// If the file is corrupt, rename it and
632+
// continue loading the shard without it.
627633
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
628-
file.Close()
629634
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
630635
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
631-
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
636+
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
632637
return
633638
}
634-
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
639+
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
635640
return
636641
}
637-
638642
df.WithObserver(f.obs)
639643
readerC <- &res{r: df}
640644
}(i, file)
@@ -668,7 +672,7 @@ func (f *FileStore) Open(ctx context.Context) error {
668672
f.lastModified = fi.ModTime().UTC()
669673
} else {
670674
close(readerC)
671-
return err
675+
return fmt.Errorf("error calling Stat on %q in FileStore.Open: %w", f.dir, err)
672676
}
673677
} else {
674678
f.lastModified = time.Unix(0, lm).UTC()

tsdb/engine/tsm1/wal.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,9 +625,10 @@ func (l *WAL) Close() error {
625625

626626
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
627627
func segmentFileNames(dir string) ([]string, error) {
628-
names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
628+
pattern := filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension))
629+
names, err := filepath.Glob(pattern)
629630
if err != nil {
630-
return nil, err
631+
return nil, fmt.Errorf("segmentFileNames: error in Glob for %q: %w", pattern, err)
631632
}
632633
sort.Strings(names)
633634
return names, nil

tsdb/index.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3209,15 +3209,15 @@ func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile
32093209
if os.IsNotExist(err) {
32103210
// nop, use default
32113211
} else if err != nil {
3212-
return nil, err
3212+
return nil, fmt.Errorf("error calling Stat on %q in NewIndex: %w", path, err)
32133213
} else if err == nil {
32143214
format = TSI1IndexName
32153215
}
32163216

32173217
// Lookup index by format.
32183218
fn := newIndexFuncs[format]
32193219
if fn == nil {
3220 3D11 -
return nil, fmt.Errorf("invalid index format: %q", format)
3220+
return nil, fmt.Errorf("invalid index format for %q in NewIndex: %q", path, format)
32213221
}
32223222
return fn(id, database, path, seriesIDSet, sfile, options), nil
32233223
}

tsdb/shard.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ func (e ShardError) Error() string {
9494
return fmt.Sprintf("[shard %d] %s", e.id, e.Err)
9595
}
9696

97+
// Unwrap returns the underlying error.
98+
func (e ShardError) Unwrap() error {
99+
return e.Err
100+
}
101+
97102
// PartialWriteError indicates a write request could only write a portion of the
98103
// requested values.
99104
type PartialWriteError struct {

tsdb/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (s *Store) loadShards(ctx context.Context) error {
447447
err = s.OpenShard(ctx, shard, false)
448448
if err != nil {
449449
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
450-
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %s", shardID, err)}
450+
resC <- &res{err: fmt.Errorf("failed to open shard: %d: %w", shardID, err)}
451451
return
452452
}
453453

@@ -611,7 +611,7 @@ func (s *Store) OpenShard(ctx context.Context, sh *Shard, force bool) error {
611611
s.badShards.setShardOpenError(sh.ID(), err)
612612
return err
613613
} else {
614-
return oldErr
614+
return fmt.Errorf("not attempting to open shard %d; %w", sh.ID(), oldErr)
615615
}
616616
}
617617

tsdb/store_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,17 @@ func TestStore_BadShard(t *testing.T) {
153153
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)
154154

155155 sh := tsdb.NewTempShard(t, idx)
156+
shId := sh.ID()
156157
err := s.OpenShard(context.Background(), sh.Shard, false)
157158
require.NoError(t, err, "opening temp shard")
158159
require.NoError(t, sh.Close(), "closing temporary shard")
159160

160-
s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr))
161+
expErr := errors.New(errStr)
162+
s.SetShardOpenErrorForTest(sh.ID(), expErr)
161163
err2 := s.OpenShard(context.Background(), sh.Shard, false)
162164
require.Error(t, err2, "no error opening bad shard")
163165
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
164-
require.EqualError(t, err2, "opening shard previously failed with: "+errStr)
166+
require.EqualError(t, err2, fmt.Errorf("not attempting to open shard %d; opening shard previously failed with: %w", shId, expErr).Error())
165167

166168
// This should succeed with the force (and because opening an open shard automatically succeeds)
167169
require.NoError(t, s.OpenShard(context.Background(), sh.Shard, true), "forced re-opening previously failing shard")

0 commit comments

Comments
 (0)
0