From fd880fe7390b724c3bcc6452fcfb45834c4f51fc Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 22 Nov 2022 20:39:21 -0800 Subject: [PATCH 01/11] Key Image Router Service --- fog/ledger/server/src/error.rs | 85 +++++ .../server/src/key_image_router_service.rs | 83 +++++ fog/ledger/server/src/lib.rs | 4 + fog/ledger/server/src/router_handlers.rs | 346 ++++++++++++++++++ 4 files changed, 518 insertions(+) create mode 100644 fog/ledger/server/src/error.rs create mode 100644 fog/ledger/server/src/key_image_router_service.rs create mode 100644 fog/ledger/server/src/router_handlers.rs diff --git a/fog/ledger/server/src/error.rs b/fog/ledger/server/src/error.rs new file mode 100644 index 0000000000..d741cf0e1c --- /dev/null +++ b/fog/ledger/server/src/error.rs @@ -0,0 +1,85 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use displaydoc::Display; +use grpcio::RpcStatus; +use mc_common::logger::Logger; +use mc_fog_ledger_enclave_api::Error as LedgerEnclaveError; +use mc_sgx_report_cache_untrusted::Error as ReportCacheError; +use mc_util_grpc::{rpc_internal_error, rpc_permissions_error}; + +#[derive(Debug, Display)] +pub enum RouterServerError { + /// Error related to contacting Fog Ledger Store: {0} + LedgerStoreError(String), + /// Ledger Enclave error: {0} + Enclave(LedgerEnclaveError), +} + +impl From for RouterServerError { + fn from(src: grpcio::Error) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_common::ResponderIdParseError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriParseError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +impl From for RouterServerError { + fn from(src: mc_util_uri::UriConversionError) -> Self { + RouterServerError::LedgerStoreError(format!("{}", src)) + } +} + +pub fn router_server_err_to_rpc_status( + context: &str, + src: RouterServerError, + logger: Logger, +) -> RpcStatus { + match src { + RouterServerError::LedgerStoreError(_) => { + rpc_internal_error(context, format!("{}", src), &logger) + } + RouterServerError::Enclave(_) => { + rpc_permissions_error(context, format!("{}", src), &logger) + } + } +} + +impl From for RouterServerError { + fn from(src: LedgerEnclaveError) -> Self { + RouterServerError::Enclave(src) + } +} + +#[derive(Display)] +pub enum LedgerServerError { + /// Ledger Enclave error: {0} + Enclave(LedgerEnclaveError), + // Failed to join thread: {0} + //ThreadJoin(String), + // RPC shutdown failure: {0} + //RpcShutdown(String), + /// Report cache error: {0} + ReportCache(ReportCacheError), +} + +impl From for LedgerServerError { + fn from(src: LedgerEnclaveError) -> Self { + LedgerServerError::Enclave(src) + } +} + +impl From for LedgerServerError { + fn from(src: ReportCacheError) -> Self { + Self::ReportCache(src) + } +} diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs new file mode 100644 index 0000000000..dad10a08fc --- /dev/null +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -0,0 +1,83 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use futures::{FutureExt, TryFutureExt}; +use grpcio::{DuplexSink, RequestStream, RpcContext}; +use mc_common::logger::{log, Logger}; +use mc_fog_api::{ + ledger::{LedgerRequest, LedgerResponse}, + ledger_grpc::{self, LedgerApi}, +}; +use mc_fog_ledger_enclave::LedgerEnclaveProxy; +use mc_fog_uri::KeyImageStoreUri; +use mc_util_grpc::rpc_logger; +use mc_util_metrics::SVC_COUNTERS; + +use crate::router_handlers; + +#[derive(Clone)] +pub struct KeyImageRouterService +where + E: LedgerEnclaveProxy, +{ + enclave: E, + shards: Arc>>>, + logger: Logger, +} + +impl KeyImageRouterService { + /// Creates a new LedgerRouterService that can be used by a gRPC server to + /// fulfill gRPC requests. + #[allow(dead_code)] // FIXME + pub fn new( + enclave: E, + shards: Arc>>>, + logger: Logger, + ) -> Self { + Self { + enclave, + shards, + logger, + } + } +} + +impl LedgerApi for KeyImageRouterService +where + E: LedgerEnclaveProxy, +{ + fn request( + &mut self, + ctx: RpcContext, + requests: RequestStream, + responses: DuplexSink, + ) { + log::info!(self.logger, "Request received in request fn"); + let _timer = SVC_COUNTERS.req(&ctx); + mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| { + log::warn!( + self.logger, + "Streaming GRPC Ledger API only partially implemented." + ); + let logger = logger.clone(); + + let shards = self.shards.read().expect("RwLock poisoned"); + let future = router_handlers::handle_requests( + shards.values().cloned().collect(), + self.enclave.clone(), + requests, + responses, + logger.clone(), + ) + .map_err(move |err: grpcio::Error| log::error!(&logger, "failed to reply: {}", err)) + // TODO: Do more with the error than just push it to the log. + .map(|_| ()); + + ctx.spawn(future) + }); + } +} diff --git a/fog/ledger/server/src/lib.rs b/fog/ledger/server/src/lib.rs index 049c2a090e..c876354690 100644 --- a/fog/ledger/server/src/lib.rs +++ b/fog/ledger/server/src/lib.rs @@ -4,11 +4,15 @@ mod block_service; mod config; mod counters; mod db_fetcher; +mod error; mod key_image_service; mod merkle_proof_service; mod server; mod untrusted_tx_out_service; +mod key_image_router_service; +mod router_handlers; + pub use block_service::BlockService; pub use config::LedgerServerConfig; pub use key_image_service::KeyImageService; diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs new file mode 100644 index 0000000000..739d13341f --- /dev/null +++ b/fog/ledger/server/src/router_handlers.rs @@ -0,0 +1,346 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +use crate::error::{router_server_err_to_rpc_status, RouterServerError}; +use futures::{future::try_join_all, SinkExt, TryStreamExt}; +use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; +use mc_attest_api::attest; +use mc_attest_enclave_api::{EnclaveMessage, NonceSession}; +use mc_common::{logger::Logger, ResponderId}; +use mc_fog_api::{ + ledger::{ + LedgerRequest, LedgerResponse, MultiKeyImageStoreRequest, MultiKeyImageStoreResponse, + MultiKeyImageStoreResponseStatus, + }, + ledger_grpc::KeyImageStoreApiClient, +}; +use mc_fog_ledger_enclave::LedgerEnclaveProxy; +use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; +//use mc_fog_ledger_enclave_api::LedgerEnclaveProxy; +use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; +use std::{collections::BTreeMap, str::FromStr, sync::Arc}; + +#[allow(dead_code)] //FIXME +const RETRY_COUNT: usize = 3; + +/// Handles a series of requests sent by the Fog Ledger Router client, +/// routing them out to shards. +pub async fn handle_requests( + shard_clients: Vec>, + enclave: E, + mut requests: RequestStream, + mut responses: DuplexSink, + logger: Logger, +) -> Result<(), grpcio::Error> +where + E: LedgerEnclaveProxy, +{ + while let Some(request) = requests.try_next().await? { + let result = handle_request( + request, + shard_clients.clone(), + enclave.clone(), + logger.clone(), + ) + .await; + match result { + Ok(response) => responses.send((response, WriteFlags::default())).await?, + Err(rpc_status) => return responses.fail(rpc_status).await, + } + } + responses.close().await?; + Ok(()) +} + +/// Handles a client's request by performing either an authentication or a +/// query. +pub async fn handle_request( + mut request: LedgerRequest, + shard_clients: Vec>, + enclave: E, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + if request.has_auth() { + handle_auth_request(enclave, request.take_auth(), logger) + } else if request.has_check_key_images() { + handle_query_request( + request.take_check_key_images(), + enclave, + shard_clients, + logger, + ) + .await + // TODO: Handle other cases here as they are added, such as the merkele + // proof service. + } else { + let rpc_status = rpc_invalid_arg_error( + "Inavlid LedgerRequest request", + "Neither the query nor auth fields were set".to_string(), + &logger, + ); + Err(rpc_status) + } +} + +/// The result of processing the MultiLedgerStoreQueryResponse from each Fog +/// Ledger Shard. +pub struct ProcessedShardResponseData { + /// gRPC clients for Shards that need to be retried for a successful + /// response. + pub shard_clients_for_retry: Vec>, + + /// Uris for individual Fog Ledger Stores that need to be authenticated with + /// by the Fog Router. It should only have entries if + /// `shard_clients_for_retry` has entries. + pub store_uris_for_authentication: Vec, + + /// New, successfully processed query responses. + pub new_query_responses: Vec<(ResponderId, attest::NonceMessage)>, +} + +impl ProcessedShardResponseData { + pub fn new( + shard_clients_for_retry: Vec>, + store_uris_for_authentication: Vec, + new_query_responses: Vec<(ResponderId, attest::NonceMessage)>, + ) -> Self { + ProcessedShardResponseData { + shard_clients_for_retry, + store_uris_for_authentication, + new_query_responses, + } + } +} + +/// Processes the MultiLedgerStoreQueryResponses returned by each Ledger Shard. +pub fn process_shard_responses( + clients_and_responses: Vec<(Arc, MultiKeyImageStoreResponse)>, +) -> Result { + let mut shard_clients_for_retry = Vec::new(); + let mut store_uris_for_authentication = Vec::new(); + let mut new_query_responses = Vec::new(); + + for (shard_client, mut response) in clients_and_responses { + // We did not receive a query_response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Ledger Store that returned the decryption_error + let store_uri = KeyImageStoreUri::from_str(response.get_fog_ledger_store_uri())?; + match response.get_status() { + MultiKeyImageStoreResponseStatus::SUCCESS => { + let store_responder_id = store_uri.responder_id()?; + new_query_responses.push((store_responder_id, response.take_query_response())); + } + MultiKeyImageStoreResponseStatus::AUTHENTICATION_ERROR => { + shard_clients_for_retry.push(shard_client); + store_uris_for_authentication.push(store_uri); + } + // This call will be retried as part of the larger retry logic + MultiKeyImageStoreResponseStatus::NOT_READY => (), + } + } + + Ok(ProcessedShardResponseData::new( + shard_clients_for_retry, + store_uris_for_authentication, + new_query_responses, + )) +} + +/// Handles a client's authentication request. +fn handle_auth_request( + enclave: E, + auth_message: attest::AuthMessage, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + let (client_auth_response, _) = enclave.client_accept(auth_message.into()).map_err(|err| { + router_server_err_to_rpc_status("Auth: e client accept", err.into(), logger) + })?; + + let mut response = LedgerResponse::new(); + response.mut_auth().set_data(client_auth_response.into()); + Ok(response) +} + +#[allow(unused_variables)] +/// Handles a client's query request. +async fn handle_query_request( + query: attest::Message, + enclave: E, + shard_clients: Vec>, + logger: Logger, +) -> Result +where + E: LedgerEnclaveProxy, +{ + let mut query_responses: BTreeMap> = BTreeMap::new(); + let mut shard_clients = shard_clients.clone(); + let sealed_query = enclave + .decrypt_and_seal_query(query.into()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal encryption error", + err.into(), + logger.clone(), + ) + })?; + + // The retry logic here is: + // Set retries remaining to RETRY_COUNT + // Send query and process responses + // If there's a response from every shard, we're done + // If there's a new store, repeat + // If there's no new store and we don't have enough responses, decrement + // RETRY_COUNT and loop + let mut remaining_retries = RETRY_COUNT; + while remaining_retries > 0 { + let multi_ledger_store_query_request = enclave + .create_multi_key_image_store_query_data(sealed_query.clone()) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal encryption error", + err.into(), + logger.clone(), + ) + })? + .into(); + let clients_and_responses = + route_query(&multi_ledger_store_query_request, shard_clients.clone()) + .await + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query routing error", + err, + logger.clone(), + ) + })?; + + let processed_shard_response_data = process_shard_responses(clients_and_responses) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: internal query response processing", + err, + logger.clone(), + ) + })?; + + for (store_responder_id, new_query_response) in processed_shard_response_data + .new_query_responses + .into_iter() + { + query_responses.insert(store_responder_id, new_query_response.into()); + } + + if query_responses.len() >= shard_clients.len() { + break; + } + + shard_clients = processed_shard_response_data.shard_clients_for_retry; + if !shard_clients.is_empty() { + authenticate_ledger_stores( + enclave.clone(), + processed_shard_response_data.store_uris_for_authentication, + logger.clone(), + ) + .await?; + } else { + remaining_retries -= 1; + } + } + + if remaining_retries == 0 { + return Err(router_server_err_to_rpc_status( + "Query: timed out connecting to key image stores", + RouterServerError::LedgerStoreError(format!( + "Received {} responses which failed to advance the MultiKeyImageStoreRequest", + RETRY_COUNT + )), + logger.clone(), + )); + } + + let query_response = enclave + .collate_shard_query_responses(sealed_query, query_responses) + .map_err(|err| { + router_server_err_to_rpc_status( + "Query: shard response collation", + RouterServerError::Enclave(err), + logger.clone(), + ) + })?; + + let mut response = LedgerResponse::new(); + response.set_check_key_image_response(query_response.into()); + Ok(response) +} + +/// Sends a client's query request to all of the Fog Ledger shards. +async fn route_query( + request: &MultiKeyImageStoreRequest, + shard_clients: Vec>, +) -> Result, MultiKeyImageStoreResponse)>, RouterServerError> { + let responses = shard_clients + .into_iter() + .map(|shard_client| query_shard(request, shard_client)); + try_join_all(responses).await +} + +/// Sends a client's query request to one of the Fog Ledger shards. +async fn query_shard( + request: &MultiKeyImageStoreRequest, + shard_client: Arc, +) -> Result<(Arc, MultiKeyImageStoreResponse), RouterServerError> { + let client_unary_receiver = shard_client.multi_key_image_store_query_async(request)?; + let response = client_unary_receiver.await?; + + Ok((shard_client, response)) +} + +// Authenticates Fog Ledger Stores that have previously not been authenticated. +async fn authenticate_ledger_stores( + enclave: E, + ledger_store_uris: Vec, + logger: Logger, +) -> Result, RpcStatus> { + let pending_auth_requests = ledger_store_uris + .into_iter() + .map(|store_uri| authenticate_ledger_store(enclave.clone(), store_uri, logger.clone())); + + try_join_all(pending_auth_requests).await.map_err(|err| { + router_server_err_to_rpc_status( + "Query: cannot authenticate with each Fog Ledger Store:", + err, + logger.clone(), + ) + }) +} + +// Authenticates a Fog Ledger Store that has previously not been authenticated. +async fn authenticate_ledger_store( + enclave: E, + ledger_store_url: KeyImageStoreUri, + logger: Logger, +) -> Result<(), RouterServerError> { + let ledger_store_id = ResponderId::from_str(&ledger_store_url.to_string())?; + let client_auth_request = enclave.ledger_store_init(ledger_store_id.clone())?; + let grpc_env = Arc::new( + grpcio::EnvBuilder::new() + .name_prefix("authenticate-ledger-store".to_string()) + .build(), + ); + let ledger_store_client = KeyImageStoreApiClient::new( + ChannelBuilder::default_channel_builder(grpc_env) + .connect_to_uri(&ledger_store_url, &logger), + ); + + let auth_unary_receiver = ledger_store_client.auth_async(&client_auth_request.into())?; + let auth_response = auth_unary_receiver.await?; + + let result = enclave.ledger_store_connect(ledger_store_id, auth_response.into())?; + + Ok(result) +} From 66d011e2b5e5490665f6013d4135c6038dcea55b Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Tue, 22 Nov 2022 20:47:53 -0800 Subject: [PATCH 02/11] Remove unneeded dead_code annotation --- fog/ledger/server/src/router_handlers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 739d13341f..5c4a26b43f 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -19,7 +19,6 @@ use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; use std::{collections::BTreeMap, str::FromStr, sync::Arc}; -#[allow(dead_code)] //FIXME const RETRY_COUNT: usize = 3; /// Handles a series of requests sent by the Fog Ledger Router client, From e55b8dc2f212cb186c2a650827f64de913dd6af3 Mon Sep 17 00:00:00 2001 From: NotGyro Date: Wed, 23 Nov 2022 19:21:55 -0500 Subject: [PATCH 03/11] Update fog/ledger/server/src/error.rs Co-authored-by: Nick Santana --- fog/ledger/server/src/error.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/fog/ledger/server/src/error.rs b/fog/ledger/server/src/error.rs index d741cf0e1c..917a703869 100644 --- a/fog/ledger/server/src/error.rs +++ b/fog/ledger/server/src/error.rs @@ -64,10 +64,6 @@ impl From for RouterServerError { pub enum LedgerServerError { /// Ledger Enclave error: {0} Enclave(LedgerEnclaveError), - // Failed to join thread: {0} - //ThreadJoin(String), - // RPC shutdown failure: {0} - //RpcShutdown(String), /// Report cache error: {0} ReportCache(ReportCacheError), } From 62a2d4e56f665929b88111b436c7077b3e33b863 Mon Sep 17 00:00:00 2001 From: NotGyro Date: Wed, 23 Nov 2022 19:26:06 -0500 Subject: [PATCH 04/11] Update fog/ledger/server/src/key_image_router_service.rs Co-authored-by: Nick Santana --- fog/ledger/server/src/key_image_router_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs index dad10a08fc..fb8ebc8439 100644 --- a/fog/ledger/server/src/key_image_router_service.rs +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -73,7 +73,7 @@ where responses, logger.clone(), ) - .map_err(move |err: grpcio::Error| log::error!(&logger, "failed to reply: {}", err)) + .map_err(move |err| log::error!(&logger, "failed to reply: {}", err)) // TODO: Do more with the error than just push it to the log. .map(|_| ()); From 7ed4a8492cbe9a4dcaa5c27e04101b44b7696ab8 Mon Sep 17 00:00:00 2001 From: NotGyro Date: Wed, 23 Nov 2022 19:26:33 -0500 Subject: [PATCH 05/11] Clean up commented-out code Co-authored-by: Nick Santana --- fog/ledger/server/src/router_handlers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 5c4a26b43f..8a6b0c4989 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -15,7 +15,6 @@ use mc_fog_api::{ }; use mc_fog_ledger_enclave::LedgerEnclaveProxy; use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; -//use mc_fog_ledger_enclave_api::LedgerEnclaveProxy; use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; use std::{collections::BTreeMap, str::FromStr, sync::Arc}; From aab7d3cb36ef2dc6b332e27673a149490ba544f4 Mon Sep 17 00:00:00 2001 From: NotGyro Date: Wed, 23 Nov 2022 19:30:19 -0500 Subject: [PATCH 06/11] Fix misnamed type in a comment Co-authored-by: Nick Santana --- fog/ledger/server/src/router_handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 8a6b0c4989..e3f8b05a6b 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -112,7 +112,7 @@ impl ProcessedShardResponseData { } } -/// Processes the MultiLedgerStoreQueryResponses returned by each Ledger Shard. +/// Processes the MultiKeyImageStoreResponses returned by each Ledger Shard. pub fn process_shard_responses( clients_and_responses: Vec<(Arc, MultiKeyImageStoreResponse)>, ) -> Result { From ca1065107694f5e594536e766bccf8c558dd8e4e Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Thu, 1 Dec 2022 20:04:04 -0800 Subject: [PATCH 07/11] Address PR feedback around logging and comments. --- .../server/src/key_image_router_service.rs | 1 - fog/ledger/server/src/router_handlers.rs | 22 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs index fb8ebc8439..ae1d3280c2 100644 --- a/fog/ledger/server/src/key_image_router_service.rs +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -56,7 +56,6 @@ where requests: RequestStream, responses: DuplexSink, ) { - log::info!(self.logger, "Request received in request fn"); let _timer = SVC_COUNTERS.req(&ctx); mc_common::logger::scoped_global_logger(&rpc_logger(&ctx, &self.logger), |logger| { log::warn!( diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index e3f8b05a6b..75b8937184 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -75,7 +75,7 @@ where } else { let rpc_status = rpc_invalid_arg_error( "Inavlid LedgerRequest request", - "Neither the query nor auth fields were set".to_string(), + "Neither the check_key_images nor auth fields were set".to_string(), &logger, ); Err(rpc_status) @@ -121,9 +121,6 @@ pub fn process_shard_responses( let mut new_query_responses = Vec::new(); for (shard_client, mut response) in clients_and_responses { - // We did not receive a query_response for this shard.Therefore, we need to: - // (a) retry the query - // (b) authenticate with the Ledger Store that returned the decryption_error let store_uri = KeyImageStoreUri::from_str(response.get_fog_ledger_store_uri())?; match response.get_status() { MultiKeyImageStoreResponseStatus::SUCCESS => { @@ -131,6 +128,9 @@ pub fn process_shard_responses( new_query_responses.push((store_responder_id, response.take_query_response())); } MultiKeyImageStoreResponseStatus::AUTHENTICATION_ERROR => { + // We did not receive a query response for this shard.Therefore, we need to: + // (a) retry the query + // (b) authenticate with the Ledger Store that returned the decryption_error shard_clients_for_retry.push(shard_client); store_uris_for_authentication.push(store_uri); } @@ -181,7 +181,7 @@ where .decrypt_and_seal_query(query.into()) .map_err(|err| { router_server_err_to_rpc_status( - "Query: internal encryption error", + "Key Images Query: internal encryption error", err.into(), logger.clone(), ) @@ -200,7 +200,7 @@ where .create_multi_key_image_store_query_data(sealed_query.clone()) .map_err(|err| { router_server_err_to_rpc_status( - "Query: internal encryption error", + "Key Images Query: internal encryption error", err.into(), logger.clone(), ) @@ -211,7 +211,7 @@ where .await .map_err(|err| { router_server_err_to_rpc_status( - "Query: internal query routing error", + "Key Images Query: internal query routing error", err, logger.clone(), ) @@ -220,7 +220,7 @@ where let processed_shard_response_data = process_shard_responses(clients_and_responses) .map_err(|err| { router_server_err_to_rpc_status( - "Query: internal query response processing", + "Key Images Query: internal query response processing", err, logger.clone(), ) @@ -252,7 +252,7 @@ where if remaining_retries == 0 { return Err(router_server_err_to_rpc_status( - "Query: timed out connecting to key image stores", + "Key Images Query: timed out connecting to key image stores", RouterServerError::LedgerStoreError(format!( "Received {} responses which failed to advance the MultiKeyImageStoreRequest", RETRY_COUNT @@ -265,7 +265,7 @@ where .collate_shard_query_responses(sealed_query, query_responses) .map_err(|err| { router_server_err_to_rpc_status( - "Query: shard response collation", + "Key Images Query: shard response collation error", RouterServerError::Enclave(err), logger.clone(), ) @@ -310,7 +310,7 @@ async fn authenticate_ledger_stores( try_join_all(pending_auth_requests).await.map_err(|err| { router_server_err_to_rpc_status( - "Query: cannot authenticate with each Fog Ledger Store:", + "Key Images Query: cannot authenticate with each Fog Ledger Store:", err, logger.clone(), ) From 7b1ebdd4ab7a78d43612498d3daab6f1573c809c Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Thu, 1 Dec 2022 20:29:07 -0800 Subject: [PATCH 08/11] Address error in loop termination logic. --- fog/ledger/server/src/router_handlers.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 75b8937184..369e4fd501 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -176,7 +176,7 @@ where E: LedgerEnclaveProxy, { let mut query_responses: BTreeMap> = BTreeMap::new(); - let mut shard_clients = shard_clients.clone(); + let mut shards_to_query = shard_clients.clone(); let sealed_query = enclave .decrypt_and_seal_query(query.into()) .map_err(|err| { @@ -207,7 +207,7 @@ where })? .into(); let clients_and_responses = - route_query(&multi_ledger_store_query_request, shard_clients.clone()) + route_query(&multi_ledger_store_query_request, shards_to_query.clone()) .await .map_err(|err| { router_server_err_to_rpc_status( @@ -237,8 +237,8 @@ where break; } - shard_clients = processed_shard_response_data.shard_clients_for_retry; - if !shard_clients.is_empty() { + shards_to_query = processed_shard_response_data.shard_clients_for_retry; + if !shards_to_query.is_empty() { authenticate_ledger_stores( enclave.clone(), processed_shard_response_data.store_uris_for_authentication, From caaff3e1ddb6dc7905efb3f588e8e1fd5b05ef07 Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Fri, 2 Dec 2022 13:38:04 -0800 Subject: [PATCH 09/11] Parameterize allowed number of retries for query loop --- fog/ledger/server/src/key_image_router_service.rs | 4 ++++ fog/ledger/server/src/router_handlers.rs | 15 +++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs index ae1d3280c2..707674547a 100644 --- a/fog/ledger/server/src/key_image_router_service.rs +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -26,6 +26,7 @@ where { enclave: E, shards: Arc>>>, + query_retries: usize, logger: Logger, } @@ -36,11 +37,13 @@ impl KeyImageRouterService { pub fn new( enclave: E, shards: Arc>>>, + query_retries: usize, logger: Logger, ) -> Self { Self { enclave, shards, + query_retries, logger, } } @@ -70,6 +73,7 @@ where self.enclave.clone(), requests, responses, + self.query_retries, logger.clone(), ) .map_err(move |err| log::error!(&logger, "failed to reply: {}", err)) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 369e4fd501..670b813676 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -18,8 +18,6 @@ use mc_fog_uri::{ConnectionUri, KeyImageStoreUri}; use mc_util_grpc::{rpc_invalid_arg_error, ConnectionUriGrpcioChannel}; use std::{collections::BTreeMap, str::FromStr, sync::Arc}; -const RETRY_COUNT: usize = 3; - /// Handles a series of requests sent by the Fog Ledger Router client, /// routing them out to shards. pub async fn handle_requests( @@ -27,6 +25,7 @@ pub async fn handle_requests( enclave: E, mut requests: RequestStream, mut responses: DuplexSink, + query_retries: usize, logger: Logger, ) -> Result<(), grpcio::Error> where @@ -37,6 +36,7 @@ where request, shard_clients.clone(), enclave.clone(), + query_retries, logger.clone(), ) .await; @@ -55,6 +55,7 @@ pub async fn handle_request( mut request: LedgerRequest, shard_clients: Vec>, enclave: E, + query_retries: usize, logger: Logger, ) -> Result where @@ -67,6 +68,7 @@ where request.take_check_key_images(), enclave, shard_clients, + query_retries, logger, ) .await @@ -170,6 +172,7 @@ async fn handle_query_request( query: attest::Message, enclave: E, shard_clients: Vec>, + query_retries: usize, logger: Logger, ) -> Result where @@ -188,13 +191,13 @@ where })?; // The retry logic here is: - // Set retries remaining to RETRY_COUNT + // Set retries remaining to query_retries // Send query and process responses // If there's a response from every shard, we're done // If there's a new store, repeat // If there's no new store and we don't have enough responses, decrement - // RETRY_COUNT and loop - let mut remaining_retries = RETRY_COUNT; + // remaining_retries and loop + let mut remaining_retries = query_retries; while remaining_retries > 0 { let multi_ledger_store_query_request = enclave .create_multi_key_image_store_query_data(sealed_query.clone()) @@ -255,7 +258,7 @@ where "Key Images Query: timed out connecting to key image stores", RouterServerError::LedgerStoreError(format!( "Received {} responses which failed to advance the MultiKeyImageStoreRequest", - RETRY_COUNT + query_retries )), logger.clone(), )); From 1d42c76d0ae2a4a3f11b8e553977dc9cbf38f52e Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Mon, 12 Dec 2022 14:19:16 -0800 Subject: [PATCH 10/11] Update based on changes from previous PRs --- fog/ledger/server/src/router_handlers.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 670b813676..21a8bc3d05 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -5,7 +5,10 @@ use futures::{future::try_join_all, SinkExt, TryStreamExt}; use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags}; use mc_attest_api::attest; use mc_attest_enclave_api::{EnclaveMessage, NonceSession}; -use mc_common::{logger::Logger, ResponderId}; +use mc_common::{ + logger::{log, Logger}, + ResponderId, +}; use mc_fog_api::{ ledger::{ LedgerRequest, LedgerResponse, MultiKeyImageStoreRequest, MultiKeyImageStoreResponse, @@ -117,13 +120,14 @@ impl ProcessedShardResponseData { /// Processes the MultiKeyImageStoreResponses returned by each Ledger Shard. pub fn process_shard_responses( clients_and_responses: Vec<(Arc, MultiKeyImageStoreResponse)>, + logger: Logger, ) -> Result { let mut shard_clients_for_retry = Vec::new(); let mut store_uris_for_authentication = Vec::new(); let mut new_query_responses = Vec::new(); for (shard_client, mut response) in clients_and_responses { - let store_uri = KeyImageStoreUri::from_str(response.get_fog_ledger_store_uri())?; + let store_uri = KeyImageStoreUri::from_str(response.get_store_uri())?; match response.get_status() { MultiKeyImageStoreResponseStatus::SUCCESS => { let store_responder_id = store_uri.responder_id()?; @@ -138,6 +142,14 @@ pub fn process_shard_responses( } // This call will be retried as part of the larger retry logic MultiKeyImageStoreResponseStatus::NOT_READY => (), + // This is an unexpected error - we should never see this + MultiKeyImageStoreResponseStatus::UNKNOWN => { + log::error!( + logger, + "Received a response with status 'UNKNOWN' from store {}", + KeyImageStoreUri::from_str(&response.store_uri)? + ); + } } } @@ -220,8 +232,8 @@ where ) })?; - let processed_shard_response_data = process_shard_responses(clients_and_responses) - .map_err(|err| { + let processed_shard_response_data = + process_shard_responses(clients_and_responses, logger.clone()).map_err(|err| { router_server_err_to_rpc_status( "Key Images Query: internal query response processing", err, From e918a6659e0a0e965f36d9341cffa5f7a8b9d436 Mon Sep 17 00:00:00 2001 From: Andrew Wygle Date: Fri, 16 Dec 2022 15:23:26 -0800 Subject: [PATCH 11/11] Don't create 'groups' in `mod` or `use` declarations. --- fog/ledger/server/src/key_image_router_service.rs | 12 +++++------- fog/ledger/server/src/lib.rs | 5 ++--- fog/ledger/server/src/router_handlers.rs | 1 - 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/fog/ledger/server/src/key_image_router_service.rs b/fog/ledger/server/src/key_image_router_service.rs index 707674547a..65f979701d 100644 --- a/fog/ledger/server/src/key_image_router_service.rs +++ b/fog/ledger/server/src/key_image_router_service.rs @@ -1,10 +1,6 @@ // Copyright (c) 2018-2022 The MobileCoin Foundation -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; - +use crate::router_handlers; use futures::{FutureExt, TryFutureExt}; use grpcio::{DuplexSink, RequestStream, RpcContext}; use mc_common::logger::{log, Logger}; @@ -16,8 +12,10 @@ use mc_fog_ledger_enclave::LedgerEnclaveProxy; use mc_fog_uri::KeyImageStoreUri; use mc_util_grpc::rpc_logger; use mc_util_metrics::SVC_COUNTERS; - -use crate::router_handlers; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; #[derive(Clone)] pub struct KeyImageRouterService diff --git a/fog/ledger/server/src/lib.rs b/fog/ledger/server/src/lib.rs index c876354690..177379e852 100644 --- a/fog/ledger/server/src/lib.rs +++ b/fog/ledger/server/src/lib.rs @@ -5,14 +5,13 @@ mod config; mod counters; mod db_fetcher; mod error; +mod key_image_router_service; mod key_image_service; mod merkle_proof_service; +mod router_handlers; mod server; mod untrusted_tx_out_service; -mod key_image_router_service; -mod router_handlers; - pub use block_service::BlockService; pub use config::LedgerServerConfig; pub use key_image_service::KeyImageService; diff --git a/fog/ledger/server/src/router_handlers.rs b/fog/ledger/server/src/router_handlers.rs index 21a8bc3d05..2bf217de08 100644 --- a/fog/ledger/server/src/router_handlers.rs +++ b/fog/ledger/server/src/router_handlers.rs @@ -178,7 +178,6 @@ where Ok(response) } -#[allow(unused_variables)] /// Handles a client's query request. async fn handle_query_request( query: attest::Message,