8000 YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism. by slfan1989 · Pull Request #4904 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism. #4904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 4, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -497,4 +497,14 @@ private static DelegationKey getDelegationKeyByMasterKey(RouterMasterKey masterK
public RouterRMDTSecretManagerState getRouterRMSecretMan 10000 agerState() {
return routerRMSecretManagerState;
}

@VisibleForTesting
public Map<SubClusterId, SubClusterInfo> getMembership() {
return membership;
}

@VisibleForTesting
public void setMembership(Map<SubClusterId, SubClusterInfo> membership) {
this.membership = membership;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ public FederationStateStore getStateStore() {
return stateStore;
}

/**
/*
* The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.hadoop.yarn.server.router.clientrm;

import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -39,7 +40,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
Expand Down Expand Up @@ -661,22 +661,19 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications = null;
Collection<GetApplicationsResponse> applications = null;
try {
applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);
applications = invokeConcurrent(remoteMethod, GetApplicationsResponse.class);
} catch (Exception ex) {
routerMetrics.incrMultipleAppsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
// Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport);
return RouterYarnClientUtils.mergeApplications(applications, returnPartialReport);
}

@Override
Expand All @@ -691,8 +688,7 @@ public GetClusterMetricsResponse getClusterMetrics(
new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
Collection<GetClusterMetricsResponse> clusterMetrics = null;
try {
clusterMetrics = invokeAppClientProtocolMethod(
true, remoteMethod, GetClusterMetricsResponse.class);
clusterMetrics = invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex);
Expand All @@ -702,67 +698,62 @@ public GetClusterMetricsResponse getClusterMetrics(
return RouterYarnClientUtils.merge(clusterMetrics);
}

<R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
ClientMethod request, Class<R> clazz) throws YarnException, IOException {
List<Callable<Object>> callables = new ArrayList<>();
List<Future<Object>> futures = new ArrayList<>();
Map<SubClusterId, IOException> exceptions = new TreeMap<>();
for (SubClusterId subClusterId : clusterIds) {
callables.add(new Callable<Object>() {
@Override
public Object call() throws Exception {
ApplicationClientProtocol protocol =
getClientRMProxyForSubCluster(subClusterId);
Method method = ApplicationClientProtocol.class
.getMethod(request.getMethodName(), request.getTypes());
return method.invoke(protocol, request.getParams());
}
<R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
throws YarnException {

// Get Active SubClusters
Map<SubClusterId, SubClusterInfo> subClusterInfo = federationFacade.getSubClusters(true);
Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();

List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
Map<SubClusterId, Exception> exceptions = new TreeMap<>();

// Generate parallel Callable tasks
for (SubClusterId subClusterId : subClusterIds) {
callables.add(() -> {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
String methodName = request.getMethodName();
Class<?>[] types = request.getTypes();
Object[] params = request.getParams();
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
return Pair.of(subClusterId, result);
});
}

// Get results from multiple threads
Map<SubClusterId, R> results = new TreeMap<>();
try {
futures.addAll(executorService.invokeAll(callables));
for (int i = 0; i < futures.size(); i++) {
SubClusterId subClusterId = clusterIds.get(i);
futures.stream().forEach(future -> {
SubClusterId subClusterId = null;
try {
Future<Object> future = futures.get(i);
Object result = future.get();
Pair<SubClusterId, Object> pair = future.get();
subClusterId = pair.getKey();
Object result = pair.getValue();
results.put(subClusterId, clazz.cast(result));
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
LOG.debug("Cannot execute {} on {}: {}", request.getMethodName(),
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
LOG.error("Cannot execute {} on {}: {}", request.getMethodName(),
subClusterId.getId(), cause.getMessage());
IOException ioe;
if (cause instanceof IOException) {
ioe = (IOException) cause;
} else if (cause instanceof YarnException) {
throw (YarnException) cause;
} else {
ioe = new IOException(
"Unhandled exception while calling " + request.getMethodName()
+ ": " + cause.getMessage(), cause);
}
// Store the exceptions
exceptions.put(subClusterId, ioe);
}
}
if (results.isEmpty() && !clusterIds.isEmpty()) {
SubClusterId subClusterId = clusterIds.get(0);
IOException ioe = exceptions.get(subClusterId);
if (ioe != null) {
throw ioe;
exceptions.put(subClusterId, e);
}
}
});
} catch (InterruptedException e) {
throw new YarnException(e);
throw new YarnException("invokeConcurrent Failed.", e);
}

// All sub-clusters return results to be considered successful,
// otherwise an exception will be thrown.
if (exceptions != null && !exceptions.isEmpty()) {
Set<SubClusterId> subClusterIdSets = exceptions.keySet();
throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
StringUtils.join(subClusterIdSets, ","));
}
return results;
}

<R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
ClientMethod request, Class<R> clazz) throws YarnException, IOException {
ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
return invokeConcurrent(clusterIdList, request, clazz);
// return result
return results.values();
}

@Override
Expand All @@ -773,24 +764,19 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);
GetClusterNodesResponse response = client.getClusterNodes(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
}
ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
try {
Collection<GetClusterNodesResponse> clusterNodes =
invokeConcurrent(remoteMethod, GetClusterNodesResponse.class);
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes);
} catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
// Merge the NodesResponse
return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values());
throw new YarnException("Unable to get cluster nodes.");
}

@Override
Expand All @@ -806,8 +792,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
new Class[]{GetQueueInfoRequest.class}, new Object[]{request});
Collection<GetQueueInfoResponse> queues = null;
try {
queues = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueInfoResponse.class);
queues = invokeConcurrent(remoteMethod, GetQueueInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get queue [" +
Expand All @@ -831,8 +816,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
Collection<GetQueueUserAclsInfoResponse> queueUserAcls = null;
try {
queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueUserAclsInfoResponse.class);
queueUserAcls = invokeConcurrent(remoteMethod, GetQueueUserAclsInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrQueueUserAclsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex);
Expand Down Expand Up @@ -992,8 +976,7 @@ public ReservationListResponse listReservations(
new Class[] {ReservationListRequest.class}, new Object[] {request});
Collection<ReservationListResponse> listResponses = null;
try {
listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
ReservationListResponse.class);
listResponses = invokeConcurrent(remoteMethod, ReservationListResponse.class);
} catch (Exception ex) {
routerMetrics.incrListReservationsFailedRetrieved();
RouterServerUtil.logAndThrowException(
Expand Down Expand Up @@ -1072,24 +1055,6 @@ public ReservationDeleteResponse deleteReservation(
throw new YarnException(msg);
}

private <R> Collection<R> invokeAppClientProtocolMethod(
Boolean filterInactiveSubClusters, ClientMethod request, Class<R> clazz)
throws YarnException, RuntimeException {
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(filterInactiveSubClusters);
return subClusters.keySet().stream().map(subClusterId -> {
try {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
Method method = ApplicationClientProtocol.class.
getMethod(request.getMethodName(), request.getTypes());
return clazz.cast(method.invoke(protocol, request.getParams()));
} catch (YarnException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
}).collect(Collectors.toList());
}

@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
Expand All @@ -1102,8 +1067,7 @@ public GetNodesToLabelsResponse getNodeToLabels(
new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
Collection<GetNodesToLabelsResponse> clusterNodes = null;
try {
clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToLabelsResponse.class);
clusterNodes = invokeConcurrent(remoteMethod, GetNodesToLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex);
Expand All @@ -1126,8 +1090,7 @@ public GetLabelsToNodesResponse getLabelsToNodes(
new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
Collection<GetLabelsToNodesResponse> labelNodes = null;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetLabelsToNodesResponse.class);
labelNodes = invokeConcurrent(remoteMethod, GetLabelsToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex);
Expand All @@ -1150,8 +1113,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
Collection<GetClusterNodeLabelsResponse> nodeLabels = null;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeLabelsResponse.class);
nodeLabels = invokeConcurrent(remoteMethod, GetClusterNodeLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.",
Expand Down Expand Up @@ -1563,8 +1525,7 @@ public GetAllResourceProfilesResponse getResourceProfiles(
new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request});
Collection<GetAllResourceProfilesResponse> resourceProfiles = null;
try {
resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod,
GetAllResourceProfilesResponse.class);
resourceProfiles = invokeConcurrent(remoteMethod, GetAllResourceProfilesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetResourceProfilesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get resource profiles due to exception.",
Expand All @@ -1588,8 +1549,7 @@ public GetResourceProfileResponse getResourceProfile(
new Class[] {GetResourceProfileRequest.class}, new Object[] {request});
Collection<GetResourceProfileResponse> resourceProfile = null;
try {
resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod,
GetResourceProfileResponse.class);
resourceProfile = invokeConcurrent(remoteMethod, GetResourceProfileResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetResourceProfileFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.",
Expand All @@ -1612,8 +1572,7 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo(
new Class[] {GetAllResourceTypeInfoRequest.class}, new Object[] {request});
Collection<GetAllResourceTypeInfoResponse> listResourceTypeInfo;
try {
listResourceTypeInfo = invokeAppClientProtocolMethod(true, remoteMethod,
GetAllResourceTypeInfoResponse.class);
listResourceTypeInfo = invokeConcurrent(remoteMethod, GetAllResourceTypeInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrResourceTypeInfoFailedRetrieved();
LOG.error("Unable to get all resource type info node due to exception.", ex);
Expand Down Expand Up @@ -1644,8 +1603,8 @@ public GetAttributesToNodesResponse getAttributesToNodes(
new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
try {
attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetAttributesToNodesResponse.class);
attributesToNodesResponses =
invokeConcurrent(remoteMethod, GetAttributesToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
Expand All @@ -1668,7 +1627,7 @@ public GetClusterNodeAttributesResponse getClusterNodeAttributes(
new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
try {
clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
clusterNodeAttributesResponses = invokeConcurrent(remoteMethod,
GetClusterNodeAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
Expand All @@ -1693,7 +1652,7 @@ public GetNodesToAttributesResponse getNodesToAttributes(
new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
try {
nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
nodesToAttributesResponses = invokeConcurrent(remoteMethod,
GetNodesToAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
Expand Down
Loading
0