From d205694c52c4cb97726095026d63b861308001b2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2024 16:05:19 +0800 Subject: [PATCH 1/4] p2p/pex: wait for saveRoutine to finish during addrBook#Stop Closes #646 --- p2p/pex/addrbook.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index c48d6cc4a91..de957818736 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -154,13 +154,8 @@ func (a *addrBook) init() { // OnStart implements Service. func (a *addrBook) OnStart() error { - if err := a.BaseService.OnStart(); err != nil { - return err - } a.loadFromFile(a.filePath) - // wg.Add to ensure that any invocation of .Wait() - // later on will wait for saveRoutine to terminate. a.wg.Add(1) go a.saveRoutine() @@ -168,12 +163,12 @@ func (a *addrBook) OnStart() error { } // OnStop implements Service. -func (a *addrBook) OnStop() { - a.BaseService.OnStop() -} - -func (a *addrBook) Wait() { +func (a *addrBook) Stop() error { + if err := a.BaseService.Stop(); err != nil { + return err + } a.wg.Wait() + return nil } func (a *addrBook) FilePath() string { @@ -491,17 +486,15 @@ func (a *addrBook) saveRoutine() { defer a.wg.Done() saveFileTicker := time.NewTicker(dumpAddressInterval) -out: for { select { case <-saveFileTicker.C: - a.saveToFile(a.filePath) + a.Save() case <-a.Quit(): - break out + a.Save() + return } } - saveFileTicker.Stop() - a.saveToFile(a.filePath) } //---------------------------------------------------------- From c2697443dbcb32f3063da7c1e6b7aba738a6a157 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2024 16:06:30 +0800 Subject: [PATCH 2/4] p2p/pex: wait for either pex routines to finish during pex.Reactor#Stop Closes #652 --- p2p/pex/pex_reactor.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index aa8e7cdba5b..d0ce6b2897f 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -84,6 +84,7 @@ type Reactor struct { book AddrBook config *ReactorConfig ensurePeersPeriod time.Duration // TODO: should go in the config + peersRoutineWg sync.WaitGroup // maps to prevent abuse requestsSent *cmap.CMap // ID->struct{}: unanswered send requests @@ -156,6 +157,7 @@ func (r *Reactor) OnStart() error { r.seedAddrs = seedAddrs + r.peersRoutineWg.Add(1) // Check if this node should run // in seed/crawler mode if r.config.SeedMode { @@ -166,11 +168,16 @@ func (r *Reactor) OnStart() error { return nil } -// OnStop implements BaseService. -func (r *Reactor) OnStop() { +// Stop implements BaseService. +func (r *Reactor) Stop() error { + if err := r.BaseReactor.Stop(); err != nil { + return err + } if err := r.book.Stop(); err != nil { - r.Logger.Error("Error stopping address book", "err", err) + return fmt.Errorf("can't stop address book: %w", err) } + r.peersRoutineWg.Wait() + return nil } // GetChannels implements Reactor. @@ -414,6 +421,8 @@ func (r *Reactor) SetEnsurePeersPeriod(d time.Duration) { // Ensures that sufficient peers are connected. (continuous). func (r *Reactor) ensurePeersRoutine() { + defer r.peersRoutineWg.Done() + var ( seed = cmtrand.NewRand() jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds()) @@ -436,8 +445,9 @@ func (r *Reactor) ensurePeersRoutine() { select { case <-ticker.C: r.ensurePeers() + case <-r.book.Quit(): + return case <-r.Quit(): - ticker.Stop() return } } @@ -475,6 +485,10 @@ func (r *Reactor) ensurePeers() { maxAttempts := numToDial * 3 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { + if !r.IsRunning() || !r.book.IsRunning() { + return + } + try := r.book.PickAddress(newBias) if try == nil { continue @@ -650,6 +664,8 @@ func (r *Reactor) AttemptsToDial(addr *p2p.NetAddress) int { // Seed/Crawler Mode causes this node to quickly disconnect // from peers, except other seed nodes. func (r *Reactor) crawlPeersRoutine() { + defer r.peersRoutineWg.Done() + // If we have any seed nodes, consult them first if len(r.seedAddrs) > 0 { r.dialSeeds() @@ -667,6 +683,8 @@ func (r *Reactor) crawlPeersRoutine() { r.attemptDisconnects() r.crawlPeers(r.book.GetSelection()) r.cleanupCrawlPeerInfos() + case <-r.book.Quit(): + return case <-r.Quit(): return } From 5c077f5dc19795c1591dcfe0149302ed46f7b93b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2024 16:40:31 +0800 Subject: [PATCH 3/4] add changelog entry --- .changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md diff --git a/.changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md b/.changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md new file mode 100644 index 00000000000..e913d7b2371 --- /dev/null +++ b/.changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md @@ -0,0 +1 @@ +- `[p2p/pex]` gracefully shutdown Reactor ([\#2010](https://github.com/cometbft/cometbft/pull/2010)) From 24030f5d79890adb70b48f4de2a2af6f8a12c2f7 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 15 Jan 2024 19:34:15 +0800 Subject: [PATCH 4/4] address @cason comments - stop tickers - amend comments --- p2p/pex/addrbook.go | 5 ++++- p2p/pex/pex_reactor.go | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index de957818736..6ece794f3b5 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -162,8 +162,10 @@ func (a *addrBook) OnStart() error { return nil } -// OnStop implements Service. +// Stop overrides Service.Stop(). func (a *addrBook) Stop() error { + // Closes the Service.Quit() channel. + // This enables a.saveRoutine() to quit. if err := a.BaseService.Stop(); err != nil { return err } @@ -486,6 +488,7 @@ func (a *addrBook) saveRoutine() { defer a.wg.Done() saveFileTicker := time.NewTicker(dumpAddressInterval) + defer saveFileTicker.Stop() for { select { case <-saveFileTicker.C: diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index d0ce6b2897f..57eb02827f2 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -168,7 +168,7 @@ func (r *Reactor) OnStart() error { return nil } -// Stop implements BaseService. +// Stop overrides `Service.Stop()`. func (r *Reactor) Stop() error { if err := r.BaseReactor.Stop(); err != nil { return err @@ -441,6 +441,7 @@ func (r *Reactor) ensurePeersRoutine() { // fire periodically ticker := time.NewTicker(r.ensurePeersPeriod) + defer ticker.Stop() for { select { case <-ticker.C: @@ -676,7 +677,7 @@ func (r *Reactor) crawlPeersRoutine() { // Fire periodically ticker := time.NewTicker(crawlPeerPeriod) - + defer ticker.Stop() for { select { case <-ticker.C: