From 2663a5652736890f18938d0ac8f9ea2d168b3924 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 May 2022 08:28:21 +0200 Subject: [PATCH 1/5] Bump sysinfo from 0.23.10 to 0.23.12 (#706) Bumps [sysinfo](https://github.com/GuillaumeGomez/sysinfo) from 0.23.10 to 0.23.12. - [Release notes](https://github.com/GuillaumeGomez/sysinfo/releases) - [Changelog](https://github.com/GuillaumeGomez/sysinfo/blob/master/CHANGELOG.md) - [Commits](https://github.com/GuillaumeGomez/sysinfo/commits) --- updated-dependencies: - dependency-name: sysinfo dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c86c93cf..e4ff7f10f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3527,7 +3527,7 @@ name = "pyrsia_central_node" version = "0.1.0" dependencies = [ "anyhow", - "clap 3.1.17", + "clap 3.1.18", "futures", "hex", "libp2p", @@ -4342,9 +4342,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.23.10" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eea2ed6847da2e0c7289f72cb4f285f0bd704694ca067d32be811b2a45ea858" +checksum = "56b1e20ee77901236c389ff74618a899ff5fd34719a7ff0fd1d64f0acca5179a" dependencies = [ "cfg-if 1.0.0", "core-foundation-sys", diff --git a/Cargo.toml b/Cargo.toml index 679033b6f..388ffbade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ sha2 = { version = "0.10.2" } stringreader = "0.1.1" strum = "0.24.0" strum_macros = "0.24.0" -sysinfo = "0.23.5" +sysinfo = "0.23.12" tempfile = "3.2.0" test-log = "0.2.8" thiserror = "1.0.31" From 5cecf3f2e1760cda31c52c100551a3673b9f647e Mon Sep 17 00:00:00 2001 From: Chris Mc Date: Wed, 18 May 2022 09:59:33 -0700 Subject: [PATCH 2/5] Add workaround for missing status checks (#700) --- .github/workflows/rust-lint-skipped.yml | 28 +++++++++++++++++++++++++ .github/workflows/rust-lint.yml | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/rust-lint-skipped.yml diff --git a/.github/workflows/rust-lint-skipped.yml b/.github/workflows/rust-lint-skipped.yml new file mode 100644 index 000000000..0f53ae051 --- /dev/null +++ b/.github/workflows/rust-lint-skipped.yml @@ -0,0 +1,28 @@ +# This implements the workaround described in https://docs.github.com/en/repositories/configuring-branches-and-merges-in-your-repository/defining-the-mergeability-of-pull-requests/troubleshooting-required-status-checks#handling-skipped-but-required-checks +# The rust linting checks are required but due to path filtering +# the jobs are not created at all so we need to manually recreate them + +name: Rust Linting +on: + pull_request: + path-ignored: # This should match https://github.com/pyrsia/pyrsia/blob/main/.github/workflows/rust-lint.yml#L4 + - .github/workflows/rust-lint.yml + - '**/*.rs' + - '**/Cargo.toml' + - Cargo.lock + +jobs: + security-audit: + runs-on: ubuntu-latest + steps: + - run: echo "No build required" + + rustfmt: + runs-on: ubuntu-latest + steps: + - run: echo "No build required" + + license-header: + runs-on: ubuntu-latest + steps: + - run: echo "No build required" diff --git a/.github/workflows/rust-lint.yml b/.github/workflows/rust-lint.yml index 7f45c0a98..6470989fb 100644 --- a/.github/workflows/rust-lint.yml +++ b/.github/workflows/rust-lint.yml @@ -1,7 +1,7 @@ name: Rust Linting on: pull_request: - paths: + paths: # Make sure to keep sync'd https://github.com/pyrsia/pyrsia/blob/main/.github/workflows/rust-lint-skipped.yml#L8 - .github/workflows/rust-lint.yml - '**/*.rs' - '**/Cargo.toml' From 1b1eb65cc7cfcad1f7b3bdbd89fe9b3a74a39664 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 May 2022 17:51:57 +0000 Subject: [PATCH 3/5] Bump actions/github-script from 6.0.0 to 6.1.0 (#697) Bumps [actions/github-script](https://github.com/actions/github-script) from 6.0.0 to 6.1.0. - [Release notes](https://github.com/actions/github-script/releases) - [Commits](https://github.com/actions/github-script/compare/v6.0.0...v6.1.0) --- updated-dependencies: - dependency-name: actions/github-script dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Chris Mc --- .github/workflows/board-automation.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/board-automation.yml b/.github/workflows/board-automation.yml index 8ffd997c6..960b2b0ca 100644 --- a/.github/workflows/board-automation.yml +++ b/.github/workflows/board-automation.yml @@ -37,7 +37,7 @@ jobs: column: In Progress repo-token: ${{ secrets.ORG_ACCESS_TOKEN }} - if: github.event.pull_request.user.login == 'dependabot[bot]' - uses: actions/github-script@v6.0.0 + uses: actions/github-script@v6.1.0 with: github-token: ${{ secrets.ORG_ACCESS_TOKEN }} script: | From e46a8c0ec8ba77d3165e24ac46fedc952fec29ec Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 May 2022 11:38:36 -0700 Subject: [PATCH 4/5] Bump tj-actions/changed-files from 19 to 20 (#702) Bumps [tj-actions/changed-files](https://github.com/tj-actions/changed-files) from 19 to 20. - [Release notes](https://github.com/tj-actions/changed-files/releases) - [Changelog](https://github.com/tj-actions/changed-files/blob/main/HISTORY.md) - [Commits](https://github.com/tj-actions/changed-files/compare/v19...v20) --- updated-dependencies: - dependency-name: tj-actions/changed-files dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3d98a5f1b..cdc60cd66 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -182,7 +182,7 @@ jobs: tags: pyrsiaoss/pyrsia-node:latest - name: Only build if CodeCoverage.Dockerfile changed - uses: tj-actions/changed-files@v19 + uses: tj-actions/changed-files@v20 id: changed-files - name: Build and Push Code Coverage Base Image From 6aadf7116d4ad5d0a44434b15ae64fb4c1a1a8e0 Mon Sep 17 00:00:00 2001 From: Joeri Sykora Date: Thu, 19 May 2022 09:38:30 +0000 Subject: [PATCH 5/5] Propagate channel errors instead of panicking (#708) * propagate client channel errors instead of panicking * add max_provided_keys option to central node --- pyrsia_central_node/src/args/parser.rs | 4 + pyrsia_central_node/src/main.rs | 30 +++++-- pyrsia_central_node/src/network/handlers.rs | 28 +++---- pyrsia_node/src/main.rs | 28 +++++-- pyrsia_node/src/network/handlers.rs | 30 ++++--- src/docker/v2/handlers/blobs.rs | 7 +- src/docker/v2/handlers/manifests.rs | 7 +- src/network/client.rs | 86 ++++++++++----------- src/network/client/command.rs | 9 +-- src/network/event_loop.rs | 16 ++-- src/network/p2p.rs | 6 +- src/network_central/event_loop.rs | 16 ++-- src/network_central/p2p.rs | 21 ++++- src/node_api/handlers/swarm.rs | 4 +- 14 files changed, 165 insertions(+), 127 deletions(-) diff --git a/pyrsia_central_node/src/args/parser.rs b/pyrsia_central_node/src/args/parser.rs index 821cbff36..777e07c2f 100644 --- a/pyrsia_central_node/src/args/parser.rs +++ b/pyrsia_central_node/src/args/parser.rs @@ -19,6 +19,7 @@ use libp2p::Multiaddr; const DEFAULT_HOST: &str = "127.0.0.1"; const DEFAULT_LISTEN_ADDRESS: &str = "/ip4/0.0.0.0/tcp/0"; +const DEFAULT_MAX_PROVIDED_KEYS: &str = "32768"; const DEFAULT_PORT: &str = "8181"; /// Application to connect to and participate in the Pyrsia network @@ -37,4 +38,7 @@ pub struct PyrsiaNodeArgs { /// An address to connect with another Pyrsia Node (eg /ip4/127.0.0.1/tcp/45153/p2p/12D3KooWKsHbKbcVgyiRRgeXGCK4bp3MngnSU7ioeKTfQzd18B2v) #[clap(long, short = 'P')] pub peer: Option, + /// The maximum number of keys that can be provided on the network by this Pyrsia Node. + #[clap(long, default_value = DEFAULT_MAX_PROVIDED_KEYS)] + pub max_provided_keys: usize, } diff --git a/pyrsia_central_node/src/main.rs b/pyrsia_central_node/src/main.rs index c1f203ec1..33e1bb688 100644 --- a/pyrsia_central_node/src/main.rs +++ b/pyrsia_central_node/src/main.rs @@ -28,7 +28,7 @@ use pyrsia::node_api::routes::make_node_routes; use clap::Parser; use futures::StreamExt; -use log::{debug, info}; +use log::{debug, info, warn}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use warp::Filter; @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { let args = PyrsiaNodeArgs::parse(); debug!("Create p2p components"); - let (p2p_client, mut p2p_events, event_loop) = p2p::setup_libp2p_swarm()?; + let (p2p_client, mut p2p_events, event_loop) = p2p::setup_libp2p_swarm(args.max_provided_keys)?; debug!("Start p2p event loop"); tokio::spawn(event_loop.run()); @@ -62,16 +62,29 @@ async fn main() -> Result<(), Box> { artifact_hash, channel, } => { - handlers::handle_request_artifact( + if let Err(error) = handlers::handle_request_artifact( p2p_client.clone(), - artifact_type, + &artifact_type, &artifact_hash, channel, ) .await + { + warn!( + "This node failed to provide artifact with type {} and hash {}. Error: {:?}", + artifact_type, artifact_hash, error + ); + } } pyrsia::network_central::event_loop::PyrsiaEvent::IdleMetricRequest { channel } => { - handlers::handle_request_idle_metric(p2p_client.clone(), channel).await + if let Err(error) = + handlers::handle_request_idle_metric(p2p_client.clone(), channel).await + { + warn!( + "This node failed to provide idle metrics. Error: {:?}", + error + ); + } } } } @@ -124,5 +137,10 @@ async fn setup_p2p(mut p2p_client: Client, args: PyrsiaNodeArgs) { } debug!("Provide local artifacts"); - handlers::provide_artifacts(p2p_client.clone()).await; + if let Err(error) = handlers::provide_artifacts(p2p_client.clone()).await { + warn!( + "An error occured while providing local artifacts. Error: {:?}", + error + ); + } } diff --git a/pyrsia_central_node/src/network/handlers.rs b/pyrsia_central_node/src/network/handlers.rs index 0cc53b3d0..fc1f3651b 100644 --- a/pyrsia_central_node/src/network/handlers.rs +++ b/pyrsia_central_node/src/network/handlers.rs @@ -32,7 +32,7 @@ pub async fn dial_other_peer(mut p2p_client: Client, to_dial: &Multiaddr) { } /// Provide all known artifacts on the p2p network -pub async fn provide_artifacts(mut p2p_client: Client) { +pub async fn provide_artifacts(mut p2p_client: Client) -> anyhow::Result<()> { if let Ok(package_versions) = node_manager::handlers::METADATA_MGR.list_package_versions() { debug!( "Start providing {} package versions", @@ -41,7 +41,7 @@ pub async fn provide_artifacts(mut p2p_client: Client) { for package_version in package_versions.iter() { p2p_client .provide(ArtifactType::PackageVersion, package_version.into()) - .await; + .await?; } } @@ -50,47 +50,43 @@ pub async fn provide_artifacts(mut p2p_client: Client) { for artifact_hash in artifact_hashes.iter() { p2p_client .provide(ArtifactType::Artifact, artifact_hash.into()) - .await; + .await?; } } + + Ok(()) } /// Respond to a RequestArtifact event by getting the artifact /// based on the provided artifact type and hash. pub async fn handle_request_artifact( mut p2p_client: Client, - artifact_type: ArtifactType, + artifact_type: &ArtifactType, artifact_hash: &str, channel: ResponseChannel, -) { +) -> anyhow::Result<()> { debug!( "Handling request artifact: {:?}={:?}", artifact_type, artifact_hash ); let content = match artifact_type { - ArtifactType::Artifact => get_artifact(artifact_hash), - ArtifactType::PackageVersion => get_package_version(artifact_hash), + ArtifactType::Artifact => get_artifact(artifact_hash)?, + ArtifactType::PackageVersion => get_package_version(artifact_hash)?, }; - match content { - Ok(content) => p2p_client.respond_artifact(content, channel).await, - Err(error) => info!( - "This node does not provide artifact with type {} and hash {}. Error: {:?}", - artifact_type, artifact_hash, error - ), - } + p2p_client.respond_artifact(content, channel).await } //Respond to the IdleMetricRequest event pub async fn handle_request_idle_metric( mut p2p_client: Client, channel: ResponseChannel, -) { +) -> anyhow::Result<()> { let metric = node_manager::handlers::get_quality_metric(); let peer_metrics: PeerMetrics = PeerMetrics { idle_metric: metric.to_le_bytes(), }; - p2p_client.respond_idle_metric(peer_metrics, channel).await; + p2p_client.respond_idle_metric(peer_metrics, channel).await } /// Get the artifact with the provided hash from the artifact manager. diff --git a/pyrsia_node/src/main.rs b/pyrsia_node/src/main.rs index 16cd91c75..486fc4042 100644 --- a/pyrsia_node/src/main.rs +++ b/pyrsia_node/src/main.rs @@ -28,7 +28,7 @@ use pyrsia::node_api::routes::make_node_routes; use clap::Parser; use futures::StreamExt; -use log::{debug, info}; +use log::{debug, info, warn}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use warp::Filter; @@ -62,16 +62,29 @@ async fn main() -> Result<(), Box> { artifact_hash, channel, } => { - handlers::handle_request_artifact( + if let Err(error) = handlers::handle_request_artifact( p2p_client.clone(), - artifact_type, + &artifact_type, &artifact_hash, channel, ) .await + { + warn!( + "This node failed to provide artifact with type {} and hash {}. Error: {:?}", + artifact_type, artifact_hash, error + ); + } } pyrsia::network::event_loop::PyrsiaEvent::IdleMetricRequest { channel } => { - handlers::handle_request_idle_metric(p2p_client.clone(), channel).await + if let Err(error) = + handlers::handle_request_idle_metric(p2p_client.clone(), channel).await + { + warn!( + "This node failed to provide idle metrics. Error: {:?}", + error + ); + } } } } @@ -123,5 +136,10 @@ async fn setup_p2p(mut p2p_client: Client, args: PyrsiaNodeArgs) { handlers::dial_other_peer(p2p_client.clone(), &to_dial).await; } debug!("Provide local artifacts"); - handlers::provide_artifacts(p2p_client.clone()).await; + if let Err(error) = handlers::provide_artifacts(p2p_client.clone()).await { + warn!( + "An error occured while providing local artifacts. Error: {:?}", + error + ); + } } diff --git a/pyrsia_node/src/network/handlers.rs b/pyrsia_node/src/network/handlers.rs index 0cc53b3d0..209786dfa 100644 --- a/pyrsia_node/src/network/handlers.rs +++ b/pyrsia_node/src/network/handlers.rs @@ -32,7 +32,7 @@ pub async fn dial_other_peer(mut p2p_client: Client, to_dial: &Multiaddr) { } /// Provide all known artifacts on the p2p network -pub async fn provide_artifacts(mut p2p_client: Client) { +pub async fn provide_artifacts(mut p2p_client: Client) -> anyhow::Result<()> { if let Ok(package_versions) = node_manager::handlers::METADATA_MGR.list_package_versions() { debug!( "Start providing {} package versions", @@ -41,7 +41,7 @@ pub async fn provide_artifacts(mut p2p_client: Client) { for package_version in package_versions.iter() { p2p_client .provide(ArtifactType::PackageVersion, package_version.into()) - .await; + .await?; } } @@ -50,47 +50,43 @@ pub async fn provide_artifacts(mut p2p_client: Client) { for artifact_hash in artifact_hashes.iter() { p2p_client .provide(ArtifactType::Artifact, artifact_hash.into()) - .await; + .await?; } } + + Ok(()) } /// Respond to a RequestArtifact event by getting the artifact /// based on the provided artifact type and hash. pub async fn handle_request_artifact( mut p2p_client: Client, - artifact_type: ArtifactType, + artifact_type: &ArtifactType, artifact_hash: &str, channel: ResponseChannel, -) { +) -> anyhow::Result<()> { debug!( "Handling request artifact: {:?}={:?}", artifact_type, artifact_hash ); let content = match artifact_type { - ArtifactType::Artifact => get_artifact(artifact_hash), - ArtifactType::PackageVersion => get_package_version(artifact_hash), + ArtifactType::Artifact => get_artifact(artifact_hash)?, + ArtifactType::PackageVersion => get_package_version(artifact_hash)?, }; - match content { - Ok(content) => p2p_client.respond_artifact(content, channel).await, - Err(error) => info!( - "This node does not provide artifact with type {} and hash {}. Error: {:?}", - artifact_type, artifact_hash, error - ), - } + p2p_client.respond_artifact(content, channel).await } //Respond to the IdleMetricRequest event pub async fn handle_request_idle_metric( mut p2p_client: Client, channel: ResponseChannel, -) { +) -> anyhow::Result<()> { let metric = node_manager::handlers::get_quality_metric(); let peer_metrics: PeerMetrics = PeerMetrics { idle_metric: metric.to_le_bytes(), }; - p2p_client.respond_idle_metric(peer_metrics, channel).await; + p2p_client.respond_idle_metric(peer_metrics, channel).await } /// Get the artifact with the provided hash from the artifact manager. @@ -109,7 +105,7 @@ fn get_artifact(artifact_hash: &str) -> anyhow::Result> { /// * version /// /// This is an example of an identifier: `4658011310974e1bb5c46fd4df7e78b9/alpine/3.15.4` -fn get_package_version(package_version_identifier: &str) -> Result, anyhow::Error> { +fn get_package_version(package_version_identifier: &str) -> anyhow::Result> { let decoded_hash: Vec<&str> = package_version_identifier.split('/').collect(); if let Some(package_version) = node_manager::handlers::METADATA_MGR.get_package_version( decoded_hash[0], diff --git a/src/docker/v2/handlers/blobs.rs b/src/docker/v2/handlers/blobs.rs index 281fe1ea6..5080853c6 100644 --- a/src/docker/v2/handlers/blobs.rs +++ b/src/docker/v2/handlers/blobs.rs @@ -62,7 +62,8 @@ pub async fn handle_get_blobs( p2p_client .provide(ArtifactType::Artifact, hash.clone().into()) - .await; + .await + .map_err(RegistryError::from)?; debug!("Final Step: {:?} successfully retrieved!", hash); Ok(warp::http::response::Builder::new() @@ -150,13 +151,13 @@ async fn get_blob_from_network( ) -> Result<(), RegistryError> { let providers = p2p_client .list_providers(ArtifactType::Artifact, hash.into()) - .await; + .await?; debug!( "Step 2: Does {:?} exist in the Pyrsia network? Providers: {:?}", hash, providers ); - match p2p_client.get_idle_peer(providers).await { + match p2p_client.get_idle_peer(providers).await? { Some(peer) => { debug!( "Step 2: YES, {:?} exists in the Pyrsia network, fetching from peer {:?}.", diff --git a/src/docker/v2/handlers/manifests.rs b/src/docker/v2/handlers/manifests.rs index 7e4ae26d7..9fb54dad9 100644 --- a/src/docker/v2/handlers/manifests.rs +++ b/src/docker/v2/handlers/manifests.rs @@ -210,7 +210,8 @@ async fn get_manifest_from_network( ); let providers = p2p_client .list_providers(ArtifactType::PackageVersion, (&package_version).into()) - .await; + .await + .map_err(RegistryError::from)?; debug!( "Step 2: Does manifest for {} with tag {} exist in the pyrsia network? Providers: {:?}", name, tag, providers @@ -284,14 +285,14 @@ async fn get_manifest_from_other_peer( async fn save_package_version( mut p2p_client: Client, package_version: PackageVersion, -) -> Result<(), anyhow::Error> { +) -> anyhow::Result<()> { let pv_json = serde_json::to_string(&package_version) .unwrap_or_else(|_| "*** missing JSON ***".to_string()); match METADATA_MGR.create_package_version(&package_version)? { MetadataCreationStatus::Created => { p2p_client .provide(ArtifactType::PackageVersion, package_version.into()) - .await; + .await?; info!("Saved package version from docker manifest: {}", pv_json) } MetadataCreationStatus::Duplicate { json } => info!( diff --git a/src/network/client.rs b/src/network/client.rs index 6358ee93d..310157a21 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -26,7 +26,6 @@ use libp2p::core::{Multiaddr, PeerId}; use libp2p::request_response::ResponseChannel; use log::debug; use std::collections::HashSet; -use std::error; /* peer metrics support */ const PEER_METRIC_THRESHOLD: f64 = 0.5_f64; @@ -110,7 +109,7 @@ pub struct Client { impl Client { /// Instruct the swarm to start listening on the specified address. - pub async fn listen(&mut self, addr: &Multiaddr) -> Result<(), Box> { + pub async fn listen(&mut self, addr: &Multiaddr) -> anyhow::Result<()> { debug!("p2p::Client::listen {:?}", addr); let (sender, receiver) = oneshot::channel(); @@ -119,16 +118,12 @@ impl Client { addr: addr.clone(), sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + receiver.await? } /// Dial a peer with the specified address. - pub async fn dial( - &mut self, - peer_addr: &Multiaddr, - ) -> Result<(), Box> { + pub async fn dial(&mut self, peer_addr: &Multiaddr) -> anyhow::Result<()> { debug!("p2p::Client::dial {:?}", peer_addr); let (sender, receiver) = oneshot::channel(); @@ -137,28 +132,30 @@ impl Client { peer_addr: peer_addr.clone(), sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + receiver.await? } /// List the peers that this node is connected to. - pub async fn list_peers(&mut self) -> HashSet { + pub async fn list_peers(&mut self) -> anyhow::Result> { let (sender, receiver) = oneshot::channel(); self.sender .send(Command::ListPeers { peer_id: self.local_peer_id, sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + Ok(receiver.await?) } /// Inform the swarm that this node is currently a /// provider of the artifact with the specified `type` /// and `hash`. - pub async fn provide(&mut self, artifact_type: ArtifactType, artifact_hash: ArtifactHash) { + pub async fn provide( + &mut self, + artifact_type: ArtifactType, + artifact_hash: ArtifactHash, + ) -> anyhow::Result<()> { debug!( "p2p::Client::provide {:?}={:?}", artifact_type, artifact_hash @@ -171,9 +168,8 @@ impl Client { artifact_hash, sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + Ok(receiver.await?) } /// List all peers in the swarm that are providing @@ -182,7 +178,7 @@ impl Client { &mut self, artifact_type: ArtifactType, artifact_hash: ArtifactHash, - ) -> HashSet { + ) -> anyhow::Result> { debug!( "p2p::Client::list_providers {:?}={:?}", artifact_type, artifact_hash @@ -195,9 +191,8 @@ impl Client { artifact_hash, sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + Ok(receiver.await?) } /// Request an artifact with the specified `type` and `hash` @@ -207,7 +202,7 @@ impl Client { peer: &PeerId, artifact_type: ArtifactType, artifact_hash: ArtifactHash, - ) -> Result, Box> { + ) -> anyhow::Result> { debug!( "p2p::Client::request_artifact {:?}: {:?}={:?}", peer, artifact_type, artifact_hash @@ -221,9 +216,8 @@ impl Client { peer: *peer, sender, }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") + .await?; + receiver.await? } /// Put the artifact as a response to an incoming artifact @@ -232,23 +226,27 @@ impl Client { &mut self, artifact: Vec, channel: ResponseChannel, - ) { + ) -> anyhow::Result<()> { debug!("p2p::Client::respond_artifact size={:?}", artifact.len()); self.sender .send(Command::RespondArtifact { artifact, channel }) - .await - .expect("Command receiver not to be dropped."); + .await?; + + Ok(()) } //get a peer with a low enough work load to download artifact otherwise the lowest work load of the set //TODO: chunk the peers to some limit to keep from shotgunning the network - pub async fn get_idle_peer(&mut self, providers: HashSet) -> Option { + pub async fn get_idle_peer( + &mut self, + providers: HashSet, + ) -> anyhow::Result> { debug!( "p2p::Client::get_idle_peer() entered with {} peers", providers.len() ); - let mut metrics_array: Vec = Vec::new(); + let mut idle_metrics: Vec = Vec::new(); for peer in providers.iter() { let (sender, receiver) = oneshot::channel(); self.sender @@ -256,8 +254,7 @@ impl Client { peer: *peer, sender, }) - .await - .expect("Command receiver not to be dropped"); + .await?; match receiver.await.expect("Sender not to be dropped.") { Ok(peer_metric) => { @@ -271,13 +268,13 @@ impl Client { "p2p::Client::get_idle_peer() Found peer with a below threshold idle value {}", metric ); - return Some(idle_metric.peer); + return Ok(Some(idle_metric.peer)); } else { debug!( "p2p::Client::get_idle_peer() Pushing idle peer with value {}", metric ); - metrics_array.push(idle_metric); + idle_metrics.push(idle_metric); } } Err(e) => { @@ -290,19 +287,15 @@ impl Client { } //sort the peers in ascending order according to their idle metric and return top of list - metrics_array.sort_by(|a, b| a.metric.partial_cmp(&b.metric).unwrap()); - if !metrics_array.is_empty() { - Some(metrics_array[0].peer) - } else { - None - } + idle_metrics.sort_by(|a, b| a.metric.partial_cmp(&b.metric).unwrap()); + Ok(idle_metrics.first().map(|idle_metric| idle_metric.peer)) } pub async fn respond_idle_metric( &mut self, metric: PeerMetrics, channel: ResponseChannel, - ) { + ) -> anyhow::Result<()> { debug!( "p2p::Client::respond_idle_metric PeerMetrics metric ={:?}", metric @@ -310,8 +303,9 @@ impl Client { self.sender .send(Command::RespondIdleMetric { metric, channel }) - .await - .expect("Command receiver not to be dropped."); + .await?; + + Ok(()) } } diff --git a/src/network/client/command.rs b/src/network/client/command.rs index d36fe2abd..4c0d04768 100644 --- a/src/network/client/command.rs +++ b/src/network/client/command.rs @@ -21,7 +21,6 @@ use futures::channel::oneshot; use libp2p::core::{Multiaddr, PeerId}; use libp2p::request_response::ResponseChannel; use std::collections::HashSet; -use std::error::Error; use strum_macros::Display; /// Commands are sent by the [`Client`] to the [`PyrsiaEventLoop`]. @@ -31,11 +30,11 @@ use strum_macros::Display; pub enum Command { Listen { addr: Multiaddr, - sender: oneshot::Sender>>, + sender: oneshot::Sender>, }, Dial { peer_addr: Multiaddr, - sender: oneshot::Sender>>, + sender: oneshot::Sender>, }, ListPeers { peer_id: PeerId, @@ -55,7 +54,7 @@ pub enum Command { artifact_type: ArtifactType, artifact_hash: ArtifactHash, peer: PeerId, - sender: oneshot::Sender, Box>>, + sender: oneshot::Sender>>, }, RespondArtifact { artifact: Vec, @@ -63,7 +62,7 @@ pub enum Command { }, RequestIdleMetric { peer: PeerId, - sender: oneshot::Sender>>, + sender: oneshot::Sender>, }, RespondIdleMetric { metric: PeerMetrics, diff --git a/src/network/event_loop.rs b/src/network/event_loop.rs index 1df8fd4b2..a07bc58c1 100644 --- a/src/network/event_loop.rs +++ b/src/network/event_loop.rs @@ -35,13 +35,11 @@ use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::error::Error; -type PendingDialMap = HashMap>>>; +type PendingDialMap = HashMap>>; type PendingListPeersMap = HashMap>>; type PendingStartProvidingMap = HashMap>; -type PendingRequestArtifactMap = - HashMap, Box>>>; -type PendingRequestIdleMetricMap = - HashMap>>>; +type PendingRequestArtifactMap = HashMap>>>; +type PendingRequestIdleMetricMap = HashMap>>; /// The `PyrsiaEventLoop` is responsible for taking care of incoming /// events from the libp2p [`Swarm`] itself, the different network @@ -215,7 +213,7 @@ impl PyrsiaEventLoop { .pending_request_artifact .remove(&request_id) .expect("Request to still be pending.") - .send(Err(Box::new(error))); + .send(Err(From::from(error))); } RequestResponseEvent::ResponseSent { .. } => {} } @@ -255,7 +253,7 @@ impl PyrsiaEventLoop { .pending_idle_metric_requests .remove(&request_id) .expect("Request to still be pending.") - .send(Err(Box::new(error))); + .send(Err(From::from(error))); } RequestResponseEvent::ResponseSent { .. } => {} } @@ -300,7 +298,7 @@ impl PyrsiaEventLoop { Command::Listen { addr, sender } => { let _ = match self.swarm.listen_on(addr) { Ok(_) => sender.send(Ok(())), - Err(e) => sender.send(Err(Box::new(e))), + Err(e) => sender.send(Err(From::from(e))), }; } Command::Dial { peer_addr, sender } => { @@ -310,7 +308,7 @@ impl PyrsiaEventLoop { self.pending_dial.insert(peer_addr, sender); } Err(e) => { - let _ = sender.send(Err(Box::new(e))); + let _ = sender.send(Err(From::from(e))); } } } diff --git a/src/network/p2p.rs b/src/network/p2p.rs index 534e74600..50468b950 100644 --- a/src/network/p2p.rs +++ b/src/network/p2p.rs @@ -58,7 +58,7 @@ use std::iter; /// the [`FileExchangeProtocol`] /// /// The maximum number of provided keys for the memory store that is used by -/// Kademlia can be provided with the `max_provider_keys` parameter. This number +/// Kademlia can be provided with the `max_provided_keys` parameter. This number /// should be equal to or higher than the total number of artifacts and manifests /// that the pyrsia node will be providing. /// @@ -104,11 +104,11 @@ use std::iter; /// * the receiver part of the event channel /// * the PyrsiaEventLoop pub fn setup_libp2p_swarm( - max_provider_keys: usize, + max_provided_keys: usize, ) -> Result<(Client, impl Stream, PyrsiaEventLoop), Box> { let local_keypair = keypair_util::load_or_generate_ed25519(); - let (swarm, local_peer_id) = create_swarm(local_keypair, max_provider_keys)?; + let (swarm, local_peer_id) = create_swarm(local_keypair, max_provided_keys)?; let (command_sender, command_receiver) = mpsc::channel(32); let (event_sender, event_receiver) = mpsc::channel(32); diff --git a/src/network_central/event_loop.rs b/src/network_central/event_loop.rs index 5db7c936b..7a51946c3 100644 --- a/src/network_central/event_loop.rs +++ b/src/network_central/event_loop.rs @@ -37,13 +37,11 @@ use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::error::Error; -type PendingDialMap = HashMap>>>; +type PendingDialMap = HashMap>>; type PendingListPeersMap = HashMap>>; type PendingStartProvidingMap = HashMap>; -type PendingRequestArtifactMap = - HashMap, Box>>>; -type PendingRequestIdleMetricMap = - HashMap>>>; +type PendingRequestArtifactMap = HashMap>>>; +type PendingRequestIdleMetricMap = HashMap>>; /// The `PyrsiaEventLoop` is responsible for taking care of incoming /// events from the libp2p [`Swarm`] itself, the different network @@ -218,7 +216,7 @@ impl PyrsiaEventLoop { .pending_request_artifact .remove(&request_id) .expect("Request to still be pending.") - .send(Err(Box::new(error))); + .send(Err(From::from(error))); } RequestResponseEvent::ResponseSent { .. } => {} } @@ -298,7 +296,7 @@ impl PyrsiaEventLoop { .pending_idle_metric_requests .remove(&request_id) .expect("Request to still be pending.") - .send(Err(Box::new(error))); + .send(Err(From::from(error))); } RequestResponseEvent::ResponseSent { .. } => {} } @@ -344,7 +342,7 @@ impl PyrsiaEventLoop { Command::Listen { addr, sender } => { let _ = match self.swarm.listen_on(addr) { Ok(_) => sender.send(Ok(())), - Err(e) => sender.send(Err(Box::new(e))), + Err(e) => sender.send(Err(From::from(e))), }; } Command::Dial { peer_addr, sender } => { @@ -354,7 +352,7 @@ impl PyrsiaEventLoop { self.pending_dial.insert(peer_addr, sender); } Err(e) => { - let _ = sender.send(Err(Box::new(e))); + let _ = sender.send(Err(From::from(e))); } } } diff --git a/src/network_central/p2p.rs b/src/network_central/p2p.rs index 71359f09b..2d7874841 100644 --- a/src/network_central/p2p.rs +++ b/src/network_central/p2p.rs @@ -26,7 +26,7 @@ use futures::prelude::*; use libp2p::core; use libp2p::dns; use libp2p::kad; -use libp2p::kad::record::store::MemoryStore; +use libp2p::kad::record::store::{MemoryStore, MemoryStoreConfig}; use libp2p::mplex; use libp2p::noise; use libp2p::relay::v2::relay::{self, Relay}; @@ -58,6 +58,11 @@ use std::time::Duration; /// * RequestResponse: a generic request/response protocol implementation for /// the [`FileExchangeProtocol`] /// +/// The maximum number of provided keys for the memory store that is used by +/// Kademlia can be provided with the `max_provided_keys` parameter. This number +/// should be equal to or higher than the total number of artifacts and manifests +/// that the pyrsia node will be providing. +/// /// The Client uses the command channel to send commands that interact with the libp2p /// network. This is the main entry point for an application to perform actions on the /// libp2p network, i.e. dialing other peers, listing available providers, ... @@ -101,10 +106,11 @@ use std::time::Duration; /// * the PyrsiaEventLoop pub fn setup_libp2p_swarm( + max_provided_keys: usize, ) -> Result<(Client, impl Stream, PyrsiaEventLoop), Box> { let local_keypair = keypair_util::load_or_generate_ed25519(); - let (swarm, local_peer_id) = create_swarm(local_keypair)?; + let (swarm, local_peer_id) = create_swarm(local_keypair, max_provided_keys)?; let (command_sender, command_receiver) = mpsc::channel(32); let (event_sender, event_receiver) = mpsc::channel(32); @@ -144,12 +150,18 @@ fn create_transport( // create the libp2p swarm fn create_swarm( keypair: identity::Keypair, + max_provided_keys: usize, ) -> Result<(Swarm, core::PeerId), Box> { let peer_id = keypair.public().to_peer_id(); let identify_config = identify::IdentifyConfig::new(String::from("ipfs/1.0.0"), keypair.public()); + let memory_store_config = MemoryStoreConfig { + max_provided_keys, + ..Default::default() + }; + Ok(( SwarmBuilder::new( create_transport(keypair)?, @@ -162,7 +174,10 @@ fn create_swarm( ..Default::default() }, ), - kademlia: kad::Kademlia::new(peer_id, MemoryStore::new(peer_id)), + kademlia: kad::Kademlia::new( + peer_id, + MemoryStore::with_config(peer_id, memory_store_config), + ), request_response: RequestResponse::new( ArtifactExchangeCodec(), iter::once((ArtifactExchangeProtocol(), ProtocolSupport::Full)), diff --git a/src/node_api/handlers/swarm.rs b/src/node_api/handlers/swarm.rs index b68cc28e1..fe87438cc 100644 --- a/src/node_api/handlers/swarm.rs +++ b/src/node_api/handlers/swarm.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use warp::{http::StatusCode, Rejection, Reply}; pub async fn handle_get_peers(mut p2p_client: Client) -> Result { - let peers = p2p_client.list_peers().await; + let peers = p2p_client.list_peers().await.map_err(RegistryError::from)?; debug!("Got received_peers: {:?}", peers); let str_peers: Vec = peers.into_iter().map(|p| p.to_string()).collect(); @@ -37,7 +37,7 @@ pub async fn handle_get_peers(mut p2p_client: Client) -> Result Result { - let peers = p2p_client.list_peers().await; + let peers = p2p_client.list_peers().await.map_err(RegistryError::from)?; let art_count_result = get_arts_summary(); if art_count_result.is_err() {