8000 YARN-11358. [Federation] Add FederationInterceptor#allow-partial-result config. by slfan1989 · Pull Request #5056 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

YARN-11358. [Federation] Add FederationInterceptor#allow-partial-result config. #5056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4294,6 +4294,11 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "webapp.cross-origin.enabled";
public static final boolean DEFAULT_ROUTER_WEBAPP_ENABLE_CORS_FILTER = false;

/** Router Interceptor Allow Partial Result Enable. **/
public static final String ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED =
ROUTER_PREFIX + "interceptor.allow-partial-result.enable";
public static final boolean DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED = false;

////////////////////////////////
// CSI Volume configs
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5085,4 +5085,18 @@
</description>
</property>

<property>
<name>yarn.router.interceptor.allow-partial-result.enable</name>
<value>false</value>
<description>
This configuration represents whether to allow the interceptor to
return partial SubCluster results.
If true, we will ignore the exception to some subClusters during the calling process,
and return result.
If false, if an exception occurs in a subCluster during the calling process,
an exception will be thrown directly.
Default is false.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private boolean returnPartialReport;
private boolean appInfosCacheEnabled;
private int appInfosCacheCount;
private boolean allowPartialResult;
private long submitIntervalTime;

private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
Expand Down Expand Up @@ -194,6 +195,10 @@ public void init(String user) {
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
}

allowPartialResult = conf.getBoolean(
YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);

submitIntervalTime = conf.getTimeDuration(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -975,10 +980,13 @@ public NodesInfo getNodes(String states) {
});
} catch (NotFoundException e) {
LOG.error("get all active sub cluster(s) error.", e);
throw e;
} catch (YarnException e) {
LOG.error("getNodes by states = {} error.", states, e);
throw new YarnRuntimeException(e);
} catch (IOException e) {
LOG.error("getNodes by states = {} error with io error.", states, e);
throw new YarnRuntimeException(e);
}

// Delete duplicate from all the node reports got from all the available
Expand Down Expand Up @@ -2070,9 +2078,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c

Map<SubClusterInfo, R> results = new HashMap<>();

// Send the requests in parallel
CompletionService<Pair<R, Exception>> compSvc =
new ExecutorCompletionService<>(this.threadpool);
// If there is a sub-cluster access error,
// we should choose whether to throw exception information according to user configuration.
// Send the requests in parallel.
CompletionService<Pair<R, Exception>> compSvc = new ExecutorCompletionService<>(threadpool);

// This part of the code should be able to expose the accessed Exception information.
// We use Pair to store related information. The left value of the Pair is the response,
Expand Down Expand Up @@ -2105,9 +2114,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
if (response != null) {
results.put(clusterId, response);
}

Exception exception = pair.getRight();
if (exception != null) {
Exception exception = pair.getValue();
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
// an exception will be thrown directly.
if (!allowPartialResult && exception != null) {
throw exception;
}
} catch (Throwable e) {
Expand Down Expand Up @@ -2178,4 +2188,9 @@ private SubClusterInfo getHomeSubClusterInfoByReservationId(String resId)
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
return appInfosCaches;
}

@VisibleForTesting
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ public void testGetLabelsOnNode() throws Exception {
Assert.assertTrue(nodeLabelsName.contains("y"));

// null request
interceptor.setAllowPartialResult(false);
NodeLabelsInfo nodeLabelsInfo2 = interceptor.getLabelsOnNode(null, "node2");
Assert.assertNotNull(nodeLabelsInfo2);
Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
Expand Down Expand Up @@ -1183,6 +1184,8 @@ public void testWebAddressWithScheme() {
@Test
public void testCheckUserAccessToQueue() throws Exception {

interceptor.setAllowPartialResult(false);

// Case 1: Only queue admin user can access other user's information
HttpServletRequest mockHsr = mockHttpServletRequestByUserName("non-admin");
String errorMsg1 = "User=non-admin doesn't haven access to queue=queue " +
Expand Down Expand Up @@ -1212,6 +1215,8 @@ public void testCheckUserAccessToQueue() throws Exception {
// Case 5: get OK only for SUBMIT_APP acl for "yarn" user
checkUserAccessToQueueFailed("queue", "yarn", QueueACL.ADMINISTER_QUEUE, "admin");
checkUserAccessToQueueSuccess("queue", "yarn", QueueACL.SUBMIT_APPLICATIONS, "admin");

interceptor.setAllowPartialResult(true);
}

private void checkUserAccessToQueueSuccess(String queue, String userName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.ws.rs.core.Response;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
import org.apache.hadoop.yarn.webapp.NotFoundException;
Expand Down Expand Up @@ -81,10 +83,16 @@ public class TestFederationInterceptorRESTRetry
@Override
public void setUp() {
super.setUpConfig();

Configuration conf = this.getConf();

// Compatible with historical test cases, we set router.allow-partial-result.enable=false.
conf.setBoolean(YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED, false);

interceptor = new TestableFederationInterceptorREST();

stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
stateStore.init(conf);
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
Expand Down Expand Up @@ -516,4 +524,58 @@ private void checkEmptyMetrics(ClusterMetricsInfo response) {
Assert.assertEquals(0, response.getActiveNodes());
Assert.assertEquals(0, response.getShutdownNodes());
}

@Test
public void testGetNodesOneBadSCAllowPartial() throws Exception {
// We set allowPartialResult to true.
// In this test case, we set up a subCluster,
// and the subCluster status is bad, we can't get the response,
// an exception should be thrown at this time.
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad2));

NodesInfo nodesInfo = interceptor.getNodes(null);
Assert.assertNotNull(nodesInfo);

// We need to set allowPartialResult=false
interceptor.setAllowPartialResult(false);
}

@Test
public void testGetNodesTwoBadSCsAllowPartial() throws Exception {
// We set allowPartialResult to true.
// In this test case, we set up 2 subClusters,
// and the status of these 2 subClusters is bad. When we call the interface,
// an exception should be returned.
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(bad1, bad2));

NodesInfo nodesInfo = interceptor.getNodes(null);
Assert.assertNotNull(nodesInfo);

// We need to set allowPartialResult=false
interceptor.setAllowPartialResult(false);
}

@Test
public void testGetNodesOneBadOneGoodAllowPartial() throws Exception {

// allowPartialResult = true,
// We tolerate exceptions and return normal results
interceptor.setAllowPartialResult(true);
setupCluster(Arrays.asList(good, bad2));

NodesInfo response = interceptor.getNodes(null);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getNodes().size());
// Check if the only node came from Good SubCluster
Assert.assertEquals(good.getId(),
Long.toString(response.getNodes().get(0).getLastHealthUpdate()));

// allowPartialResult = false,
// We do not tolerate exceptions and will throw exceptions directly
interceptor.setAllowPartialResult(false);

setupCluster(Arrays.asList(good, bad2));
}
}
0