8000 Migrate boot node and blacklisting tests to network crate by saketh-are · Pull Request #8109 · near/nearcore · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Migrate boot node and blacklisting tests to network crate #8109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 28, 2022
Merged
2 changes: 1 addition & 1 deletion chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ impl PeerActor {
}
}));

// Refresh connection nonces but only if we're outbound. For inbound connection, the other party should
// Refresh connection nonces but only if we're outbound. For inbound connection, the other party should
// take care of nonce refresh.
if act.peer_type == PeerType::Outbound {
ctx.spawn(wrap_future({
Expand Down
248 changes: 248 additions & 0 deletions chain/network/src/peer_manager/tests/routing.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use crate::blacklist;
use crate::broadcast;
use crate::config::NetworkConfig;
use crate::network_protocol::testonly as data;
use crate::network_protocol::{Edge, Encoding, Ping, Pong, RoutedMessageBody, RoutingTableUpdate};
use crate::peer;
use crate::peer::peer_actor::{ClosingReason, ConnectionClosedEvent};
use crate::peer_manager;
use crate::peer_manager::peer_manager_actor::Event as PME;
use crate::peer_manager::testonly::start as start_pm;
use crate::peer_manager::testonly::Event;
use crate::private_actix::RegisterPeerError;
use crate::store;
use crate::tcp;
use crate::testonly::{make_rng, Rng};
use crate::time;
use crate::types::PeerInfo;
use crate::types::PeerMessage;
use near_o11y::testonly::init_test_logger;
use near_store::db::TestDB;
use pretty_assertions::assert_eq;
use rand::Rng as _;
use std::collections::HashSet;
use std::net::Ipv4Addr;
use std::sync::Arc;

// test routing in a two-node network before and after connecting the nodes
Expand Down Expand Up @@ -588,6 +594,248 @@ async fn test_dropping_duplicate_messages() {
wait_for_pong(&mut pm0_ev, Pong { nonce: 1, source: id2.clone() }).await;
}

// Awaits until a ConnectionClosed event with the expected reason is seen in the event stream.
pub(crate) async fn wait_for_connection_closed(
events: &mut broadcast::Receiver<Event>,
want_reason: ClosingReason,
) {
events
.recv_until(|ev| match ev {
Event::PeerManager(PME::ConnectionClosed(ConnectionClosedEvent {
stream_id: _,
reason,
})) => {
if reason == want_reason {
Some(())
} else {
None
}
}
_ => None,
})
.await;
}

// Constructs NetworkConfigs for num_nodes nodes, the first num_boot_nodes of which
// are configured as boot nodes for all nodes.
fn make_configs(
chain: &data::Chain,
rng: &mut Rng,
num_nodes: usize,
num_boot_nodes: usize,
enable_outbound: bool,
) -> Vec<NetworkConfig> {
let mut cfgs: Vec<_> = (0..num_nodes).map(|_i| chain.make_config(rng)).collect();
let boot_nodes: Vec<_> = cfgs[0..num_boot_nodes]
.iter()
.map(|c| PeerInfo { id: c.node_id(), addr: c.node_addr, account_id: None })
.collect();
for config in cfgs.iter_mut() {
config.outbound_disabled = !enable_outbound;
config.peer_store.boot_nodes = boot_nodes.clone();
}
cfgs
}

// test bootstrapping a two-node network with one boot node
#[tokio::test]
async fn from_boot_nodes() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start two nodes");
let cfgs = make_configs(&chain, rng, 2, 1, true);
let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;

let id0 = pm0.cfg.node_id();
let id1 = pm1.cfg.node_id();

tracing::info!(target:"test", "wait for {id0} routing table");
pm0.wait_for_routing_table(&[(id1.clone(), vec![id1.clone()])]).await;
tracing::info!(target:"test", "wait for {id1} routing table");
pm1.wait_for_routing_table(&[(id0.clone(), vec![id0.clone()])]).await;
}

// test node 0 blacklisting node 1
#[tokio::test]
async fn blacklist_01() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start two nodes with 0 blacklisting 1");
let mut cfgs = make_configs(&chain, rng, 2, 2, true);
cfgs[0].peer_store.blacklist =
[blacklist::Entry::from_addr(cfgs[1].node_addr.unwrap())].into_iter().collect();

let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;

let id0 = pm0.cfg.node_id();
let id1 = pm1.cfg.node_id();

tracing::info!(target:"test", "wait for the connection to be attempted and rejected");
wait_for_connection_closed(
&mut pm0.events.clone(),
ClosingReason::RejectedByPeerManager(RegisterPeerError::Blacklisted),
)
.await;

tracing::info!(target:"test", "wait for {id0} routing table");
pm0.wait_for_routing_table(&[]).await;
tracing::info!(target:"test", "wait for {id1} routing table");
pm1.wait_for_routing_table(&[]).await;
}

// test node 1 blacklisting node 0
#[tokio::test]
async fn blacklist_10() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start two nodes with 1 blacklisting 0");
let mut cfgs = make_configs(&chain, rng, 2, 2, true);
cfgs[1].peer_store.blacklist =
[blacklist::Entry::from_addr(cfgs[0].node_addr.unwrap())].into_iter().collect();

let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;

let id0 = pm0.cfg.node_id();
let id1 = pm1.cfg.node_id();

tracing::info!(target:"test", "wait for the connection to be attempted and rejected");
wait_for_connection_closed(
&mut pm1.events.clone(),
ClosingReason::RejectedByPeerManager(RegisterPeerError::Blacklisted),
)
.await;

tracing::info!(target:"test", "wait for {id0} routing table");
pm0.wait_for_routing_table(&[]).await;
tracing::info!(target:"test", "wait for {id1} routing table");
pm1.wait_for_routing_table(&[]).await;
}

// test node 0 blacklisting all nodes
#[tokio::test]
async fn blacklist_all() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start two nodes with 0 blacklisting everything");
let mut cfgs = make_configs(&chain, rng, 2, 2, true);
cfgs[0].peer_store.blacklist =
[blacklist::Entry::from_ip(Ipv4Addr::LOCALHOST.into())].into_iter().collect();

let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;

let id0 = pm0.cfg.node_id();
let id1 = pm1.cfg.node_id();

tracing::info!(target:"test", "wait for the connection to be attempted and rejected");
wait_for_connection_closed(
&mut pm0.events.clone(),
ClosingReason::RejectedByPeerManager(RegisterPeerError::Blacklisted),
)
.await;

tracing::info!(target:"test", "wait for {id0} routing table");
pm0.wait_for_routing_table(&[]).await;
tracing::info!(target:"test", "wait for {id1} routing table");
pm1.wait_for_routing_table(&[]).await;
}

// Spawn 3 nodes with max peers configured to 2, then allow them to connect to each other in a triangle.
// Spawn a fourth node and see it fail to connect since the first three are at max capacity.
#[tokio::test]
async fn max_num_peers_limit() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

tracing::info!(target:"test", "start three nodes with max_num_peers=2");
let mut cfgs = make_configs(&chain, rng, 4, 4, true);
for config in cfgs.iter_mut() {
config.max_num_peers = 2;
config.ideal_connections_lo = 2;
config.ideal_connections_hi = 2;
}

let pm0 = start_pm(clock.clock(), TestDB::new(), cfgs[0].clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), cfgs[1].clone(), chain.clone()).await;
let pm2 = start_pm(clock.clock(), TestDB::new(), cfgs[2].clone(), chain.clone()).await;

let id0 = pm0.cfg.node_id();
let id1 = pm1.cfg.node_id();
let id2 = pm2.cfg.node_id();

tracing::info!(target:"test", "wait for {id0} routing table");
pm0.wait_for_routing_table(&[
(id1.clone(), vec![id1.clone()]),
(id2.clone(), vec![id2.clone()]),
])
.await;
tracing::info!(target:"test", "wait for {id1} routing table");
pm1.wait_for_routing_table(&[
(id0.clone(), vec![id0.clone()]),
(id2.clone(), vec![id2.clone()]),
])
.await;
tracing::info!(target:"test", "wait for {id2} routing table");
pm2.wait_for_routing_table(&[
(id0.clone(), vec![id0.clone()]),
5D32 (id1.clone(), vec![id1.clone()]),
])
.await;

let mut pm0_ev = pm0.events.from_now();
let mut pm1_ev = pm1.events.from_now();
let mut pm2_ev = pm2.events.from_now();

tracing::info!(target:"test", "start a fourth node");
let pm3 = start_pm(clock.clock(), TestDB::new(), cfgs[3].clone(), chain.clone()).await;

let id3 = pm3.cfg.node_id();

tracing::info!(target:"test", "wait for {id0} to reject attempted connection");
wait_for_connection_closed(
&mut pm0_ev,
ClosingReason::RejectedByPeerManager(RegisterPeerError::ConnectionLimitExceeded),
)
.await;
tracing::info!(target:"test", "wait for {id1} to reject attempted connection");
wait_for_connection_closed(
&mut pm1_ev,
ClosingReason::RejectedByPeerManager(RegisterPeerError::ConnectionLimitExceeded),
)
.await;
tracing::info!(target:"test", "wait for {id2} to reject attempted connection");
wait_for_connection_closed(
&mut pm2_ev,
ClosingReason::RejectedByPeerManager(RegisterPeerError::ConnectionLimitExceeded),
)
.await;

tracing::info!(target:"test", "wait for {id3} routing table");
pm3.wait_for_routing_table(&[]).await;
}

// test that TTL is handled property.
#[tokio::test]
async fn ttl() {
Expand Down
64 changes: 0 additions & 64 deletions integration-tests/src/tests/network/routing.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
use crate::tests::network::runner::*;
use near_network::time;

#[test]
fn from_boot_nodes() -> anyhow::Result<()> {
let mut runner = Runner::new(2, 1).use_boot_nodes(vec![0]).enable_outbound();

runner.push(Action::CheckRoutingTable(0, vec![(1, vec![1])]));
runner.push(Action::CheckRoutingTable(1, vec![(0, vec![0])]));

start_test(runner)
}

#[test]
fn account_propagation() -> anyhow::Result<()> {
let mut runner = Runner::new(3, 2);
Expand All @@ -23,60 +13,6 @@ fn account_propagation() -> anyhow::Result<()> {
start_test(runner)
}

#[test]
fn blacklist_01() -> anyhow::Result<()> {
let mut runner = Runner::new(2, 2).add_to_blacklist(0, Some(1)).use_boot_nodes(vec![0]);

runner.push(Action::Wait(time::Duration::milliseconds(100)));
runner.push(Action::CheckRoutingTable(1, vec![]));
runner.push(Action::CheckRoutingTable(0, vec![]));

start_test(runner)
}

#[test]
fn blacklist_10() -> anyhow::Result<()> {
let mut runner = Runner::new(2, 2).add_to_blacklist(1, Some(0)).use_boot_nodes(vec![0]);

runner.push(Action::Wait(time::Duration::milliseconds(100)));
runner.push(Action::CheckRoutingTable(1, vec![]));
runner.push(Action::CheckRoutingTable(0, vec![]));

start_test(runner)
}

#[test]
fn blacklist_all() -> anyhow::Result<()> {
let mut runner = Runner::new(2, 2).add_to_blacklist(0, None).use_boot_nodes(vec![0]);

runner.push(Action::Wait(time::Duration::milliseconds(100)));
runner.push(Action::CheckRoutingTable(1, vec![]));
runner.push(Action::CheckRoutingTable(0, vec![]));

start_test(runner)
}

/// Spawn 4 nodes with max peers required equal 2. Connect first three peers in a triangle.
/// Try to connect peer3 to peer0 and see it fail since first three peer are at max capacity.
#[test]
fn max_num_peers_limit() -> anyhow::Result<()> {
let mut runner = Runner::new(4, 4).max_num_peers(2).ideal_connections(2, 2).enable_outbound();

runner.push(Action::AddEdge { from: 0, to: 1, force: true });
runner.push(Action::AddEdge { from: 1, to: 2, force: true });
runner.push(Action::CheckRoutingTable(0, vec![(1, vec![1]), (2, vec![2])]));
runner.push(Action::CheckRoutingTable(1, vec![(0, vec![0]), (2, vec![2])]));
runner.push(Action::CheckRoutingTable(2, vec![(1, vec![1]), (0, vec![0])]));
runner.push(Action::AddEdge { from: 3, to: 0, force: false });
runner.push(Action::Wait(time::Duration::milliseconds(100)));
runner.push(Action::CheckRoutingTable(0, vec![(1, vec![1]), (2, vec![2])]));
runner.push(Action::CheckRoutingTable(1, vec![(0, vec![0]), (2, vec![2])]));
runner.push(Action::CheckRoutingTable(2, vec![(1, vec![1]), (0, vec![0])]));
runner.push(Action::CheckRoutingTable(3, vec![]));

start_test(runner)
}

/// Check that two archival nodes keep connected after network rebalance. Nodes 0 and 1 are archival nodes, others aren't.
/// Initially connect 2, 3, 4 to 0. Then connect 1 to 0, this connection should persist, even after other nodes tries
/// to connect to node 0 again.
Expand Down
15 changes: 0 additions & 15 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,6 @@ impl Runner {
}
}

/// Add node `v` to the blacklist of node `u`.
/// If passed `Some(v)` it is created a blacklist entry like:
///
/// 127.0.0.1:PORT_OF_NODE_V
///
/// Use None (instead of Some(v)) if you want to add all other nodes to the blacklist.
/// If passed None it is created a blacklist entry like:
///
/// 127.0.0.1
///
pub fn add_to_blacklist(mut self, u: usize, v: Option<usize>) -> Self {
self.test_config[u].blacklist.insert(v);
self
}

/// Add node `v` to the whitelist of node `u`.
/// If passed `v` an entry of the following form is added to the whitelist:
/// PEER_ID_OF_NODE_V@127.0.0.1:PORT_OF_NODE_V
Expand Down
0