8000 Release Candidate: dev -> main by ONECasey · Pull Request #4319 · harmony-one/harmony · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Release Candidate: dev -> main #4319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
eaf5d9c
Rebase dev branch to current main branch (#4318)
ONECasey Dec 25, 2022
f14d840
add openssl compatibility on m2 chips using darwin (#4302)
ONECasey Dec 25, 2022
fd008c9
[dumpdb] ensure each cross link is dumped (#4311)
MaxMustermann2 Dec 25, 2022
8d3be78
bump libp2p to version 0.24.0 and update its dependencies and relevan…
GheisMohammadi Dec 25, 2022
9c84332
bump libp2p to version 0.24.0 and update its dependencies and relevan…
GheisMohammadi Dec 25, 2022
c960e07
Fix for consensus stuck. (#4307)
Frozen Dec 25, 2022
4f5102a
Merge branch 'main' into dev
ONECasey Dec 25, 2022
54742e7
staged dns sync v1.0 (#4316)
GheisMohammadi Dec 25, 2022
0104b1d
add description for closing client and change randomize process to ma…
GheisMohammadi Dec 26, 2022
6ade0bf
Small fixes and code cleanup for network stack. (#4320)
Frozen Dec 29, 2022
fde2210
Fix not disable cache in archival mode (#4322)
dannyposi Jan 4, 2023
8ee1160
Feature registry (#4324)
Frozen Jan 4, 2023
2574868
Slash fix (#4284)
peekpi Jan 4, 2023
444f713
Merge branch 'main' into dev
ONECasey Jan 4, 2023
8601b26
Bump github.com/aws/aws-sdk-go from 1.30.1 to 1.33.0 (#4325) (#4328)
ONECasey Jan 4, 2023
072e08c
Merge branch 'dev' of https://github.com/harmony-one/harmony into dev
ONECasey Jan 4, 2023
63433c1
Merge branch 'main' into dev
ONECasey Jan 5, 2023
fa9bc17
Bump github.com/btcsuite/btcd from 0.21.0-beta to 0.23.2 (#4327) (#4329)
ONECasey Jan 5, 2023
a1f2e60
Merge branch 'dev' of https://github.com/harmony-one/harmony into dev
ONECasey Jan 5, 2023
File filt 8000 er

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TOP:=$(realpath ..)
export CGO_CFLAGS:=-I$(TOP)/bls/include -I$(TOP)/mcl/include -I/usr/local/opt/openssl/include
export CGO_LDFLAGS:=-L$(TOP)/bls/lib -L/usr/local/opt/openssl/lib
export LD_LIBRARY_PATH:=$(TOP)/bls/lib:$(TOP)/mcl/lib:/usr/local/opt/openssl/lib:/opt/homebrew/opt/gmp/lib/:/opt/homebrew/opt/openssl/lib
export CGO_CFLAGS:=-I$(TOP)/bls/include -I$(TOP)/mcl/include -I/opt/homebrew/opt/openssl@1.1/include
export CGO_LDFLAGS:=-L$(TOP)/bls/lib -L/opt/homebrew/opt/openssl@1.1/lib
export LD_LIBRARY_PATH:=$(TOP)/bls/lib:$(TOP)/mcl/lib:/opt/homebrew/opt/openssl@1.1/lib:/opt/homebrew/opt/gmp/lib/:/opt/homebrew/opt/openssl@1.1/lib
export LIBRARY_PATH:=$(LD_LIBRARY_PATH)
export DYLD_FALLBACK_LIBRARY_PATH:=$(LD_LIBRARY_PATH)
export GO111MODULE:=on
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ The `make` command should automatically build the Harmony binary & all dependent

However, if you wish to bypass the Makefile, first export the build flags:
```bash
export CGO_CFLAGS="-I$GOPATH/src/github.com/harmony-one/bls/include -I$GOPATH/src/github.com/harmony-one/mcl/include -I/usr/local/opt/openssl/include"
export CGO_LDFLAGS="-L$GOPATH/src/github.com/harmony-one/bls/lib -L/usr/local/opt/openssl/lib"
export LD_LIBRARY_PATH=$GOPATH/src/github.com/harmony-one/bls/lib:$GOPATH/src/github.com/harmony-one/mcl/lib:/usr/local/opt/openssl/lib
export CGO_CFLAGS="-I$GOPATH/src/github.com/harmony-one/bls/include -I$GOPATH/src/github.com/harmony-one/mcl/include -I/opt/homebrew/opt/openssl@1.1/include"
export CGO_LDFLAGS="-L$GOPATH/src/github.com/harmony-one/bls/lib -L/opt/homebrew/opt/openssl@1.1/lib"
export LD_LIBRARY_PATH=$GOPATH/src/github.com/harmony-one/bls/lib:$GOPATH/src/github.com/harmony-one/mcl/lib:/opt/homebrew/opt/openssl@1.1/lib
export LIBRARY_PATH=$LD_LIBRARY_PATH
export DYLD_FALLBACK_LIBRARY_PATH=$LD_LIBRARY_PATH
export GO111MODULE=on
Expand Down
52 changes: 48 additions & 4 deletions api/service/legacysync/downloader/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@ import (
pb "github.com/harmony-one/harmony/api/service/legacysync/downloader/proto"
"github.com/harmony-one/harmony/internal/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

// Client is the client model for downloader package.
type Client struct {
dlClient pb.DownloaderClient
opts []grpc.DialOption
conn *grpc.ClientConn
addr string
}

// ClientSetup setups a Client given ip and port.
func ClientSetup(ip, port string) *Client {
func ClientSetup(ip, port string, withBlock bool) *Client {
client := Client{}
client.opts = append(client.opts, grpc.WithInsecure())
if withBlock {
client.opts = append(client.opts, grpc.WithBlock())
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

client.addr = fmt.Sprintf("%s:%s", ip, port)
var err error
client.conn, err = grpc.DialContext(ctx, fmt.Sprintf(ip+":"+port), client.opts...)
client.conn, err = grpc.DialContext(ctx, client.addr, client.opts...)
if err != nil {
utils.Logger().Error().Err(err).Str("ip", ip).Msg("[SYNC] client.go:ClientSetup fail to dial")
return nil
Expand All @@ -35,12 +41,50 @@ func ClientSetup(ip, port string) *Client {
return &client
}

// IsReady returns true if client is ready
func (client *Client) IsReady() bool {
return client.conn.GetState() == connectivity.Ready
}

// IsConnecting returns true if client is connecting
func (client *Client) IsConnecting() bool {
return client.conn.GetState() == connectivity.Connecting
}

// State returns current Connecting state
func (client *Client) State() connectivity.State {
return client.conn.GetState()
}

// WaitForConnection waits for client to connect
func (client *Client) WaitForConnection(t time.Duration) bool {
ctx, cancel := context.WithTimeout(context.Background(), t)
defer cancel()

if client.conn.GetState() == connectivity.Ready {
return true
}

if ready := client.conn.WaitForStateChange(ctx, client.conn.GetState()); !ready {
return false
} else {
return client.conn.GetState() == connectivity.Ready
}
}

// Close closes the Client.
func (client *Client) Close() {
func (client *Client) Close(reason string) {
err := client.conn.Close()
if err != nil {
utils.Logger().Info().Msg("[SYNC] unable to close connection")
utils.Logger().Info().
Str("peerAddress", client.addr).
Msg("[SYNC] unable to close peer connection")
return
}
utils.Logger().Info().
Str("peerAddress", client.addr).
Str("reason", reason).
Msg("[SYNC] peer connection closed")
}

// GetBlockHashes gets block hashes from all the peers by calling grpc request.
Expand Down
55 changes: 28 additions & 27 deletions api/service/legacysync/epoch_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type EpochSync struct {
// If the last result is expired, ask the remote DNS nodes for latest height and return the result.
func (ss *EpochSync) GetSyncStatus() SyncCheckResult {
return ss.syncStatus.Get(func() SyncCheckResult {
return ss.isInSync(false)
return ss.isSynchronized(false)
})
}

// isInSync query the remote DNS node for the latest height to check what is the current
// isSynchronized query the remote DNS node for the latest height to check what is the current
// sync status
func (ss *EpochSync) isInSync(_ bool) SyncCheckResult {
func (ss *EpochSync) isSynchronized(_ bool) SyncCheckResult {
if ss.syncConfig == nil {
return SyncCheckResult{} // If syncConfig is not instantiated, return not in sync
}
Expand All @@ -70,9 +70,9 @@ func (ss *EpochSync) isInSync(_ bool) SyncCheckResult {
Uint64("CurrentEpoch", curEpoch).
Msg("[EPOCHSYNC] Checking sync status")
return SyncCheckResult{
IsInSync: inSync,
OtherHeight: otherHeight1,
HeightDiff: epochDiff,
IsSynchronized: inSync,
OtherHeight: otherHeight1,
HeightDiff: epochDiff,
}
}

Expand All @@ -85,17 +85,18 @@ func (ss *EpochSync) GetActivePeerNumber() int {
}

// SyncLoop will keep syncing with peers until catches up
func (ss *EpochSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus) time.Duration {
return time.Duration(ss.syncLoop(bc, isBeacon, consensus)) * time.Second
func (ss *EpochSync) SyncLoop(bc core.BlockChain, consensus *consensus.Consensus) time.Duration {
return time.Duration(syncLoop(bc, ss.syncConfig)) * time.Second
}

func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Consensus) (timeout int) {
maxHeight := getMaxPeerHeight(ss.syncConfig)
func syncLoop(bc core.BlockChain, syncConfig *SyncConfig) (timeout int) {
isBeacon := bc.ShardID() == 0
maxHeight := getMaxPeerHeight(syncConfig)
for {
if maxHeight == 0 || maxHeight == math.MaxUint64 {
utils.Logger().Info().
Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peerscount: %d)",
isBeacon, bc.ShardID(), ss.syncConfig.PeersCount())
Msgf("[EPOCHSYNC] No peers to sync (isBeacon: %t, ShardID: %d, peersCount: %d)",
isBeacon, bc.ShardID(), syncConfig.PeersCount())
return 10
}

Expand All @@ -104,19 +105,19 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
if otherEpoch == curEpoch+1 {
utils.Logger().Info().
Msgf("[EPOCHSYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d, peersCount: %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch, ss.syncConfig.PeersCount())
isBeacon, bc.ShardID(), otherEpoch, curEpoch, syncConfig.PeersCount())
return 60
}
if otherEpoch < curEpoch {
for _, peerCfg := range ss.syncConfig.GetPeers() {
ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, removve peers: %s", peerCfg.String()))
for _, peerCfg := range syncConfig.GetPeers() {
syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: current height is higher that others, remove peers: %s", peerCfg.String()))
}
return 2
}

utils.Logger().Info().
Msgf("[EPOCHSYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d, peers count %d)",
isBeacon, bc.ShardID(), otherEpoch, curEpoch, ss.syncConfig.PeersCount())
isBeacon, bc.ShardID(), otherEpoch, curEpoch, syncConfig.PeersCount())

var heights []uint64
loopEpoch := curEpoch + 1
Expand All @@ -133,7 +134,7 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
return 10
}

err := ss.ProcessStateSync(heights, bc)
err := ProcessStateSync(syncConfig, heights, bc)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherEpoch: %d, currentEpoch: %d)",
Expand All @@ -144,44 +145,44 @@ func (ss *EpochSync) syncLoop(bc core.BlockChain, isBeacon bool, _ *consensus.Co
}

// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *EpochSync) ProcessStateSync(heights []uint64, bc core.BlockChain) error {
func ProcessStateSync(syncConfig *SyncConfig, heights []uint64, bc core.BlockChain) error {
var payload [][]byte
var peerCfg *SyncPeerConfig

peers := ss.syncConfig.GetPeers()
peers := syncConfig.GetPeers()
if len(peers) == 0 {
return errors.New("no peers to sync")
}

for index, peerConfig := range peers {
resp := peerConfig.GetClient().GetBlocksByHeights(heights)
if resp == nil {
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: no response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: no response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
continue
}
if len(resp.Payload) == 0 {
ss.syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: empty payload response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("[EPOCHSYNC]: empty payload response from peer: #%d %s, count %d", index, peerConfig.String(), len(peers)))
continue
}
payload = resp.Payload
peerCfg = peerConfig
break
}
if len(payload) == 0 {
return errors.Errorf("empty payload: no blocks were returned by GetBlocksByHeights for all peers, currentPeersCount %d", ss.syncConfig.PeersCount())
return errors.Errorf("empty payload: no blocks were returned by GetBlocksByHeights for all peers, currentPeersCount %d", syncConfig.PeersCount())
}
err := ss.processWithPayload(payload, bc)
err := processWithPayload(payload, bc)
if err != nil {
// Assume that node sent us invalid data.
ss.syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: failed to process with payload from peer: %s", err.Error()))
syncConfig.RemovePeer(peerCfg, fmt.Sprintf("[EPOCHSYNC]: failed to process with payload from peer: %s", err.Error()))
utils.Logger().Error().Err(err).
Msgf("[EPOCHSYNC] Removing peer %s for invalid data", peerCfg.String())
return err
}
return nil
}

func (ss *EpochSync) processWithPayload(payload [][]byte, bc core.BlockChain) error {
func processWithPayload(payload [][]byte, bc core.BlockChain) error {
decoded := make([]*types.Block, 0, len(payload))
for idx, blockBytes := range payload {
block, err := RlpDecodeBlockOrBlockWithSig(blockBytes)
Expand All @@ -201,8 +202,8 @@ func (ss *EpochSync) processWithPayload(payload [][]byte, bc core.BlockChain) er
}

// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32) error {
func (ss *EpochSync) CreateSyncConfig(peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) error {
var err error
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID)
ss.syncConfig, err = createSyncConfig(ss.syncConfig, peers, shardID, waitForEachPeerToConnect)
return err
}
56 changes: 38 additions & 18 deletions api/service/legacysync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
// utils.Logger().Debug().Bool("isBeacon", isBeacon).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]getMaxPeerHeight")
response, err := peerConfig.client.GetBlockChainHeight()
if err != nil {
utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.ip).Str("peerPort", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed")
utils.Logger().Warn().Err(err).Str("peerIP", peerConfig.peer.IP).Str("peerPort", peerConfig.peer.Port).Msg("[Sync]GetBlockChainHeight failed")
syncConfig.RemovePeer(peerConfig, fmt.Sprintf("failed getMaxPeerHeight for shard %d with message: %s", syncConfig.ShardID(), err.Error()))
return
}
utils.Logger().Info().Str("peerIP", peerConfig.ip).Uint64("blockHeight", response.BlockHeight).
utils.Logger().Info().Str("peerIP", peerConfig.peer.IP).Uint64("blockHeight", response.BlockHeight).
Msg("[SYNC] getMaxPeerHeight")

lock.Lock()
Expand All @@ -51,46 +51,66 @@ func getMaxPeerHeight(syncConfig *SyncConfig) uint64 {
return maxHeight
}

func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32) (*SyncConfig, error) {
func createSyncConfig(syncConfig *SyncConfig, peers []p2p.Peer, shardID uint32, waitForEachPeerToConnect bool) (*SyncConfig, error) {
// sanity check to ensure no duplicate peers
if err := checkPeersDuplicity(peers); err != nil {
return syncConfig, err
}
// limit the number of dns peers to connect
randSeed := time.Now().UnixNano()
peers = limitNumPeers(peers, randSeed)
targetSize, peers := limitNumPeers(peers, randSeed)

utils.Logger().Debug().
Int("len", len(peers)).
Int("peers count", len(peers)).
Int("target size", targetSize).
Uint32("shardID", shardID).
Msg("[SYNC] CreateSyncConfig: len of peers")

if len(peers) == 0 {
if targetSize == 0 {
return syncConfig, errors.New("[SYNC] no peers to connect to")
}
if syncConfig != nil {
syncConfig.CloseConnections()
}
syncConfig = NewSyncConfig(shardID, nil)

var wg sync.WaitGroup
for _, peer := range peers {
wg.Add(1)
go func(peer p2p.Peer) {
defer wg.Done()
client := downloader.ClientSetup(peer.IP, peer.Port)
if client == nil {
return
if !waitForEachPeerToConnect {
var wg sync.WaitGroup
ps := peers[:targetSize]
for _, peer := range ps {
wg.Add(1)
go func(peer p2p.Peer) {
defer wg.Done()
client := downloader.ClientSetup(peer.IP, peer.Port, false)
if client == nil {
return
}
peerConfig := &SyncPeerConfig{
peer: peer,
client: client,
}
syncConfig.AddPeer(peerConfig)
}(peer)
}
wg.Wait()
} else {
var connectedPeers int
for _, peer := range peers {
client := downloader.ClientSetup(peer.IP, peer.Port, true)
if client == nil || !client.IsReady() {
continue
}
peerConfig := &SyncPeerConfig{
ip: peer.IP,
port: peer.Port,
peer: peer,
client: client,
}
syncConfig.AddPeer(peerConfig)
}(peer)
connectedPeers++
if connectedPeers >= targetSize {
break
}
}
}
wg.Wait()
utils.Logger().Info().
Int("len", len(syncConfig.peers)).
Uint32("shardID", shardID).
Expand Down
Loading
0