8000 Fix shard response processing loop by samdealy · Pull Request #2946 · mobilecoinfoundation/mobilecoin · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix shard response processing loop #2946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions fog/view/server/src/router_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
use futures::{future::try_join_all, SinkExt, TryStreamExt};
use grpcio::{ChannelBuilder, DuplexSink, RequestStream, RpcStatus, WriteFlags};
use mc_attest_api::attest;
use mc_attest_enclave_api::SealedClientMessage;
use mc_common::logger::Logger;
use mc_fog_api::{
view::{FogViewRouterRequest, FogViewRouterResponse, MultiViewStoreQueryRequest},
Expand Down Expand Up @@ -102,9 +103,6 @@ pub async fn handle_query_request<E>(
where
E: ViewEnclaveProxy,
{
let mut query_responses: Vec<MultiViewStoreQueryResponse> =
Vec::with_capacity(shard_clients.len());
let mut shard_clients = shard_clients.clone();
let sealed_query = enclave
.decrypt_and_seal_query(query.into())
.map_err(|err| {
Expand All @@ -114,8 +112,43 @@ where
logger.clone(),
)
})?;
// TODO: use retry crate?
for _ in 0..RETRY_COUNT {

let query_responses = get_query_responses(
sealed_query.clone(),
enclave.clone(),
shard_clients.clone(),
logger.clone(),
)
.await?;

let query_response = enclave
.collate_shard_query_responses(sealed_query, query_responses)
.map_err(|err| {
router_server_err_to_rpc_status(
"Query: shard response collation",
RouterServerError::Enclave(err),
logger.clone(),
)
})?;

let mut response = FogViewRouterResponse::new();
response.set_query(query_response.into());
Ok(response)
}

async fn get_query_responses<E>(
sealed_query: SealedClientMessage,
enclave: E,
mut shard_clients: Vec<Arc<FogViewStoreApiClient>>,
logger: Logger,
) -> Result<Vec<MultiViewStoreQueryResponse>, RpcStatus>
where
E: ViewEnclaveProxy,
{
let mut query_responses: Vec<MultiViewStoreQueryResponse> =
Vec::with_capacity(shard_clients.len());
let mut remaining_tries = RETRY_COUNT;
while remaining_tries > 0 {
let multi_view_store_query_request = enclave
.create_multi_view_store_query_data(sealed_query.clone())
.map_err(|err| {
Expand Down Expand Up @@ -161,27 +194,32 @@ where
break;
}

authenticate_view_stores(
enclave.clone(),
processed_shard_response_data.view_store_uris_for_authentication,
logger.clone(),
)
.await?;
}

let query_response = enclave
.collate_shard_query_responses(sealed_query, query_responses)
.map_err(|err| {
router_server_err_to_rpc_status(
"Query: shard response collation",
RouterServerError::Enclave(err),
let view_store_uris_for_authentication =
processed_shard_response_data.view_store_uris_for_authentication;
if !view_store_uris_for_authentication.is_empty() {
authenticate_view_stores(
enclave.clone(),
view_store_uris_for_authentication,
logger.clone(),
)
})?;
.await?;
} else {
remaining_tries -= 1;
}
}

let mut response = FogViewRouterResponse::new();
response.set_query(query_response.into());
Ok(response)
if remaining_tries == 0 {
return Err(router_server_err_to_rpc_status(
"Query: timed out connecting to view stores",
RouterServerError::ViewStoreError(format!(
"Received {} responses which failed to advance the MultiViewStoreRequest",
RETRY_COUNT
)),
logger.clone(),
));
}

Ok(query_responses)
}

/// Sends a client's query request to all of the Fog View shards.
Expand Down
1 change: 1 addition & 0 deletions fog/view/server/src/shard_responses_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub fn process_shard_responses(
"Received a response with status 'unknown' from store{}",
FogViewStoreUri::from_str(&response.store_uri)?
);
shard_clients_for_retry.push(shard_client);
}
mc_fog_types::view::MultiViewStoreQueryResponseStatus::Success => {
new_query_responses.push(response.clone());
Expand Down
0