8000 fix(p2p/pex): gracefully shutdown `Reactor` by melekes · Pull Request #2010 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(p2p/pex): gracefully shutdown Reactor #2010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/unreleased/bug-fixes/2010-p2p-pex-shutdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- `[p2p/pex]` gracefully shutdown Reactor ([\#2010](https://github.com/cometbft/cometbft/pull/2010))
28 changes: 12 additions & 16 deletions p2p/pex/addrbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,26 +154,23 @@ func (a *addrBook) init() {

// OnStart implements Service.
func (a *addrBook) OnStart() error {
if err := a.BaseService.OnStart(); err != nil {
return err
}
a.loadFromFile(a.filePath)

// wg.Add to ensure that any invocation of .Wait()
// later on will wait for saveRoutine to terminate.
a.wg.Add(1)
go a.saveRoutine()

return nil
}

// OnStop implements Service.
func (a *addrBook) OnStop() {
a.BaseService.OnStop()
}

func (a *addrBook) Wait() {
// Stop overrides Service.Stop().
func (a *addrBook) Stop() error {
// Closes the Service.Quit() channel.
// This enables a.saveRoutine() to quit.
if err := a.BaseService.Stop(); err != nil {
return err
}
a.wg.Wait()
return nil
}

func (a *addrBook) FilePath() string {
Expand Down Expand Up @@ -491,17 +488,16 @@ func (a *addrBook) saveRoutine() {
defer a.wg.Done()

saveFileTicker := time.NewTicker(dumpAddressInterval)
out:
defer saveFileTicker.Stop()
for {
select {
case <-saveFileTicker.C:
a.saveToFile(a.filePath)
a.Save()
case <-a.Quit():
break out
a.Save()
return
}
}
saveFileTicker.Stop()
a.saveToFile(a.filePath)
}

//----------------------------------------------------------
Expand Down
29 changes: 24 additions & 5 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Reactor struct {
book AddrBook
config *ReactorConfig
ensurePeersPeriod time.Duration // TODO: should go in the config
peersRoutineWg sync.WaitGroup

// maps to prevent abuse
requestsSent *cmap.CMap // ID->struct{}: unanswered send requests
Expand Down Expand Up @@ -156,6 +157,7 @@ func (r *Reactor) OnStart() error {

r.seedAddrs = seedAddrs

r.peersRoutineWg.Add(1)
// Check if this node should run
// in seed/crawler mode
if r.config.SeedMode {
Expand All @@ -166,11 +168,16 @@ func (r *Reactor) OnStart() error {
return nil
}

// OnStop implements BaseService.
func (r *Reactor) OnStop() {
// Stop overrides `Service.Stop()`.
func (r *Reactor) Stop() error {
if err := r.BaseReactor.Stop(); err != nil {
return err
}
if err := r.book.Stop(); err != nil {
r.Logger.Error("Error stopping address book", "err", err)
return fmt.Errorf("can't stop address book: %w", err)
}
r.peersRoutineWg.Wait()
return nil
}

// GetChannels implements Reactor.
Expand Down Expand Up @@ -414,6 +421,8 @@ func (r *Reactor) SetEnsurePeersPeriod(d time.Duration) {

// Ensures that sufficient peers are connected. (continuous).
func (r *Reactor) ensurePeersRoutine() {
defer r.peersRoutineWg.Done()

var (
seed = cmtrand.NewRand()
jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds())
Expand All @@ -432,12 +441,14 @@ func (r *Reactor) ensurePeersRoutine() {

// fire periodically
ticker := time.NewTicker(r.ensurePeersPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.ensurePeers()
case <-r.book.Quit():
return
case <-r.Quit():
ticker.Stop()
return
}
}
Expand Down Expand Up @@ -475,6 +486,10 @@ func (r *Reactor) ensurePeers() {
maxAttempts := numToDial * 3

for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
if !r.IsRunning() || !r.book.IsRunning() {
return
}

try := r.book.PickAddress(newBias)
if try == nil {
continue
Expand Down Expand Up @@ -650,6 +665,8 @@ func (r *Reactor) AttemptsToDial(addr *p2p.NetAddress) int {
// Seed/Crawler Mode causes this node to quickly disconnect
// from peers, except other seed nodes.
func (r *Reactor) crawlPeersRoutine() {
defer r.peersRoutineWg.Done()

// If we have any seed nodes, consult them first
if len(r.seedAddrs) > 0 {
r.dialSeeds()
Expand All @@ -660,13 +677,15 @@ func (r *Reactor) crawlPeersRoutine() {

// Fire periodically
ticker := time.NewTicker(crawlPeerPeriod)

defer ticker.Stop()
for {
select {
case <-ticker.C:
r.attemptDisconnects()
r.crawlPeers(r.book.GetSelection())
r.cleanupCrawlPeerInfos()
case <-r.book.Quit():
return
case <-r.Quit():
return
}
Expand Down
0