8000 more discriminate connectivity by rade · Pull Request #555 · weaveworks/weave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Jun 20, 2024. It is now read-only.

more discriminate connectivity #555

Merged
merged 2 commits into from
Apr 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 dele 8000 tions router/connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const (
)

type ConnectionMaker struct {
ourself *LocalPeer
peers *Peers
normalisePeerAddr func(string) string
targets map[string]*Target
cmdLinePeers map[string]string
actionChan chan<- ConnectionMakerAction
ourself *LocalPeer
peers *Peers
port int
targets map[string]*Target

This comment was marked as abuse.

This comment was marked as abuse.

cmdLinePeers map[string]*net.TCPAddr
actionChan chan<- ConnectionMakerAction
}

// Information about an address where we may find a peer
Expand All @@ -33,13 +33,13 @@ type Target struct {

type ConnectionMakerAction func() bool

func NewConnectionMaker(ourself *LocalPeer, peers *Peers, normalisePeerAddr func(string) string) *ConnectionMaker {
func NewConnectionMaker(ourself *LocalPeer, peers *Peers, port int) *ConnectionMaker {
return &ConnectionMaker{
ourself: ourself,
peers: peers,
normalisePeerAddr: normalisePeerAddr,
cmdLinePeers: make(map[string]string),
targets: make(map[string]*Target)}
ourself: ourself,
peers: peers,
port: port,
cmdLinePeers: make(map[string]*net.TCPAddr),
targets: make(map[string]*Target)}
}

func (cm *ConnectionMaker) Start() {
Expand All @@ -49,16 +49,19 @@ func (cm *ConnectionMaker) Start() {
}

func (cm *ConnectionMaker) InitiateConnection(peer string) error {
addr, err := net.ResolveTCPAddr("tcp4", cm.normalisePeerAddr(peer))
host, port, err := net.SplitHostPort(peer)
if err != nil {
host = peer
port = "0" // we use that as an indication that "no port was supplied"

This comment was marked as abuse.

}
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return err
}
address := addr.String()

cm.actionChan <- func() bool {
cm.cmdLinePeers[peer] = address
cm.cmdLinePeers[peer] = addr
// curtail any existing reconnect interval
if target, found := cm.targets[address]; found {
if target, found := cm.targets[addr.String()]; found {
target.tryAfter, target.tryInterval = tryImmediately()
}
return true
Expand Down Expand Up @@ -138,7 +141,7 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
// Copy the set of things we are connected to, so we can access
// them without locking. Also clear out any entries in cm.targets
// for existing connections.
ourConnectedPeers, ourConnectedTargets := cm.ourConnections()
ourConnectedPeers, ourConnectedTargets, ourInboundIPs := cm.ourConnections()

addTarget := func(address string) {
if _, connected := ourConnectedTargets[address]; connected {
Expand All @@ -154,9 +157,23 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
}

// Add command-line targets that are not connected
for _, address := range cm.cmdLinePeers {
addTarget(address)
for _, addr := range cm.cmdLinePeers {
completeAddr := *addr
attempt := true
if completeAddr.Port == 0 {
completeAddr.Port = cm.port
// If a peer was specified w/o a port, then we do not
// attempt to connect to it if we have any inbound
// connections from that IP.
if _, connected := ourInboundIPs[completeAddr.IP.String()]; connected {
attempt = false
}
}
address := completeAddr.String()
cmdLineTarget[address] = void
if attempt {
addTarget(address)
}
}

// Add targets for peers that someone else is connected to, but we
Expand All @@ -166,18 +183,25 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
return cm.connectToTargets(validTarget, cmdLineTarget)
}

func (cm *ConnectionMaker) ourConnections() (PeerNameSet, map[string]struct{}) {
func (cm *ConnectionMaker) ourConnections() (PeerNameSet, map[string]struct{}, map[string]struct{}) {
var (
ourConnectedPeers = make(PeerNameSet)
ourConnectedTargets = make(map[string]struct{})
ourInboundIPs = make(map[string]struct{})
)
for conn := range cm.ourself.Connections() {
address := conn.RemoteTCPAddr()
delete(cm.targets, address)
ourConnectedPeers[conn.Remote().Name] = void
ourConnectedTargets[address] = void
delete(cm.targets, address)
if conn.Outbound() {
continue
}
if ip, _, err := net.SplitHostPort(address); err == nil { // should always succeed
ourInboundIPs[ip] = void
}
}
return ourConnectedPeers, ourConnectedTargets
return ourConnectedPeers, ourConnectedTargets, ourInboundIPs
}

func (cm *ConnectionMaker) addPeerTargets(ourConnectedPeers PeerNameSet, addTarget func(string)) {
Expand All @@ -196,8 +220,8 @@ func (cm *ConnectionMaker) addPeerTargets(ourConnectedPeers PeerNameSet, addTarg
if conn.Outbound() {
addTarget(address)
}
if host, _, err := net.SplitHostPort(address); err == nil {
addTarget(cm.normalisePeerAddr(host))
if ip, _, err := net.SplitHostPort(address); err == nil {
addTarget(fmt.Sprintf("%s:%d", ip, cm.port))
}
}
})
Expand Down
6 changes: 2 additions & 4 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ func (peer *LocalPeer) CreateConnection(peerAddr string, acceptNewPeer bool) err
if err := peer.checkConnectionLimit(); err != nil {
return err
}
// We're dialing the remote so that means connections will come from random ports
addrStr := peer.router.NormalisePeerAddr(peerAddr)
tcpAddr, tcpErr := net.ResolveTCPAddr("tcp4", addrStr)
udpAddr, udpErr := net.ResolveUDPAddr("udp4", addrStr)
tcpAddr, tcpErr := net.ResolveTCPAddr("tcp4", peerAddr)
udpAddr, udpErr := net.ResolveUDPAddr("udp4", peerAddr)
if tcpErr != nil || udpErr != nil {
// they really should have the same value, but just in case...
if tcpErr == nil {
Expand Down
12 changes: 1 addition & 11 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewRouter(config RouterConfig, name PeerName, nickName string) *Router {
router.Peers = NewPeers(router.Ourself.Peer, onPeerGC)
router.Peers.FetchWithDefault(router.Ourself.Peer)
router.Routes = NewRoutes(router.Ourself.Peer, router.Peers)
router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.NormalisePeerAddr)
router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.Port)
router.TopologyGossip = router.NewGossip("topology", router)
return router
}
Expand Down Expand Up @@ -417,13 +417,3 @@ func (router *Router) applyTopologyUpdate(update []byte) (PeerNameSet, PeerNameS
}
return origUpdate, newUpdate, nil
}

// given an address like '1.2.3.4:567', return the address if it has a port,
// otherwise return the address with the default port number for the router
func (router *Router) NormalisePeerAddr(peerAddr string) string {
_, _, err := net.SplitHostPort(peerAddr)
if err == nil {
return peerAddr
}
return fmt.Sprintf("%s:%d", peerAddr, router.Port)
}
0