From f6c2b64bfb5082551e9d0b73e9fb035bcc6415d5 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 6 Aug 2024 20:46:15 +0800 Subject: [PATCH 1/8] HDFS-17595. [ARR] ErasureCoding supports asynchronous rpc. --- .../federation/router/AsyncErasureCoding.java | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java new file mode 100644 index 0000000000000..b14d5a6d97ee2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; + +public class AsyncErasureCoding extends ErasureCoding{ + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + public AsyncErasureCoding(RouterRpcServer server) { + super(server); + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcClient.getNamenodeResolver(); + } + + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingPolicies"); + Set nss = namenodeResolver.getNamespaces(); + + // Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, ErasureCodingPolicyInfo[].class); + asyncApply((ApplyFunction, ErasureCodingPolicyInfo[]>) ret -> { + return merge(ret, ErasureCodingPolicyInfo.class); + }); + + return asyncReturn(ErasureCodingPolicyInfo[].class); + } + + @Override + public Map getErasureCodingCodecs() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getErasureCodingCodecs"); + Set nss = namenodeResolver.getNamespaces(); + + rpcClient.invokeConcurrent( + nss, method, true, false, Map.class); + + asyncApply((ApplyFunction, Map>) retCodecs -> { + Map ret = new HashMap<>(); + Object obj = retCodecs; + @SuppressWarnings("unchecked") + Map> results = + (Map>)obj; + Collection> allCodecs = results.values(); + for (Map codecs : allCodecs) { + ret.putAll(codecs); + } + return ret; + }); + + return asyncReturn(Map.class); + } + + @Override + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("addErasureCodingPolicies", + new Class[] {ErasureCodingPolicy[].class}, new Object[] {policies}); + Set nss = namenodeResolver.getNamespaces(); + + rpcClient.invokeConcurrent( + nss, method, true, false, AddErasureCodingPolicyResponse[].class); + + asyncApply( + (ApplyFunction + , + AddErasureCodingPolicyResponse[]>) ret -> { + return merge(ret, AddErasureCodingPolicyResponse.class); + }); + return asyncReturn(AddErasureCodingPolicyResponse[].class); + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, false, false); + RemoteMethod remoteMethod = new RemoteMethod("getErasureCodingPolicy", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential( + locations, remoteMethod, null, null); + + asyncApply(ret -> { + return (ErasureCodingPolicy) ret; + }); + + return asyncReturn(ErasureCodingPolicy.class); + } + + + @Override + public ECTopologyVerifierResult getECTopologyResultForPolicies( + String[] policyNames) throws IOException { + RemoteMethod method = new RemoteMethod("getECTopologyResultForPolicies", + new Class[] {String[].class}, new Object[] {policyNames}); + Set nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + throw new IOException("No namespace availaible."); + } + + // Map ret + rpcClient.invokeConcurrent(nss, method, true, false, + ECTopologyVerifierResult.class); + asyncApply((ApplyFunction, ECTopologyVerifierResult>) ret -> { + for (Map.Entry entry : ret + .entrySet()) { + if (!entry.getValue().isSupported()) { + return entry.getValue(); + } + } + // If no negative result, return the result from the first namespace. + return ret.get(nss.iterator().next()); + }); + return asyncReturn(ECTopologyVerifierResult.class); + } + + @Override + public ECBlockGroupStats getECBlockGroupStats() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getECBlockGroupStats"); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent( + nss, method, true, false, ECBlockGroupStats.class); + + asyncApply((ApplyFunction, ECBlockGroupStats>) allStats -> { + return ECBlockGroupStats.merge(allStats.values()); + }); + return asyncReturn(ECBlockGroupStats.class); + } +} \ No newline at end of file From 60bc89055f150e3ebac4508c9204e97b4398a627 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Wed, 7 Aug 2024 09:39:46 +0800 Subject: [PATCH 2/8] remove comments and fix checkstyle. --- .../federation/router/AsyncErasureCoding.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index b14d5a6d97ee2..d60e381c42452 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -61,12 +61,11 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies() RemoteMethod method = new RemoteMethod("getErasureCodingPolicies"); Set nss = namenodeResolver.getNamespaces(); - // Map ret = rpcClient.invokeConcurrent( nss, method, true, false, ErasureCodingPolicyInfo[].class); - asyncApply((ApplyFunction, ErasureCodingPolicyInfo[]>) ret -> { - return merge(ret, ErasureCodingPolicyInfo.class); - }); + asyncApply( + (ApplyFunction, ErasureCodingPolicyInfo[]>) + ret -> merge(ret, ErasureCodingPolicyInfo.class)); return asyncReturn(ErasureCodingPolicyInfo[].class); } @@ -110,9 +109,8 @@ public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( nss, method, true, false, AddErasureCodingPolicyResponse[].class); asyncApply( - (ApplyFunction - , - AddErasureCodingPolicyResponse[]>) ret -> { + (ApplyFunction, + AddErasureCodingPolicyResponse[]>) ret -> { return merge(ret, AddErasureCodingPolicyResponse.class); }); return asyncReturn(AddErasureCodingPolicyResponse[].class); @@ -148,7 +146,6 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies( throw new IOException("No namespace availaible."); } - // Map ret rpcClient.invokeConcurrent(nss, method, true, false, ECTopologyVerifierResult.class); asyncApply((ApplyFunction, ECTopologyVerifierResult>) ret -> { From 8af0bbf982b71a14f549bc9490160f730cbd24e4 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 8 Oct 2024 17:33:27 +0800 Subject: [PATCH 3/8] fix checkstyle. --- .../hdfs/server/federation/router/AsyncErasureCoding.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index d60e381c42452..3e47066787c5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -39,7 +39,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; -public class AsyncErasureCoding extends ErasureCoding{ +public class AsyncErasureCoding extends ErasureCoding { /** RPC server to receive client calls. */ private final RouterRpcServer rpcServer; /** RPC clients to connect to the Namenodes. */ From 746d3f257c9ef4ff59291760ca5d2c3802398839 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Wed, 9 Oct 2024 21:06:41 +0800 Subject: [PATCH 4/8] add UT. --- .../federation/router/AsyncErasureCoding.java | 4 +- .../router/TestRouterAsyncErasureCoding.java | 204 ++++++++++++++++++ 2 files changed, 206 insertions(+), 2 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index 3e47066787c5b..e4d34ecfb29e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -71,7 +71,7 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies() } @Override - public Map getErasureCodingCodecs() throws IOException { + public Map getErasureCodingCodecs() throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); RemoteMethod method = new RemoteMethod("getErasureCodingCodecs"); @@ -80,7 +80,7 @@ public Map getErasureCodingCodecs() throws IOException { rpcClient.invokeConcurrent( nss, method, true, false, Map.class); - asyncApply((ApplyFunction, Map>) retCodecs -> { + asyncApply((ApplyFunction>, Map>) retCodecs -> { Map ret = new HashMap<>(); Object obj = retCodecs; @SuppressWarnings("unchecked") diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java new file mode 100644 index 0000000000000..0d1acfa95e790 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncErasureCoding { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer routerRpcServer; + private AsyncErasureCoding asyncErasureCoding; + + private final String testfilePath = "/testdir/testAsyncErasureCoding.file"; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + cluster.setRacks( + new String[] {"/rack1", "/rack2", "/rack3"}); + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + RouterRpcServer spy = Mockito.spy(routerRpcServer); + Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); + asyncErasureCoding = new AsyncErasureCoding(spy); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + FSDataOutputStream fsDataOutputStream = routerFs.create( + new Path(testfilePath), true); + fsDataOutputStream.write(new byte[1024]); + fsDataOutputStream.close(); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + @Test + public void testRouterAsyncErasureCoding() throws Exception { + String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName(); + HdfsFileStatus fileInfo = cluster.getNamenodes().get(0).getClient().getFileInfo(testfilePath); + assertNotNull(fileInfo); + + asyncErasureCoding.setErasureCodingPolicy("/testdir", ecPolicyName); + syncReturn(null); + + asyncErasureCoding.getErasureCodingPolicy("/testdir"); + ErasureCodingPolicy ecPolicy = syncReturn(ErasureCodingPolicy.class); + assertEquals(StripedFileTestUtil.getDefaultECPolicy().getName(), ecPolicy.getName()); + + asyncErasureCoding.getErasureCodingPolicies(); + ErasureCodingPolicyInfo[] erasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class); + int numECPolicies = erasureCodingPolicies.length; + ErasureCodingPolicyInfo[] erasureCodingPoliciesFromNameNode = + cluster.getNamenodes().get(0).getClient().getErasureCodingPolicies(); + + assertArrayEquals(erasureCodingPoliciesFromNameNode, erasureCodingPolicies); + + asyncErasureCoding.getErasureCodingCodecs(); + Map erasureCodingCodecs = syncReturn(Map.class); + Map erasureCodingCodecsFromNameNode = + cluster.getNamenodes().get(0).getClient().getErasureCodingCodecs(); + + assertEquals(erasureCodingCodecs, erasureCodingCodecsFromNameNode); + + // RS-12-4-1024k + final ECSchema schema = new ECSchema("rs", 12, 4); + ErasureCodingPolicy erasureCodingPolicy = new ErasureCodingPolicy(schema, 1024 * 1024); + asyncErasureCoding.addErasureCodingPolicies(new ErasureCodingPolicy[]{erasureCodingPolicy}); + AddErasureCodingPolicyResponse[] response = syncReturn(AddErasureCodingPolicyResponse[].class); + assertEquals(response[0].isSucceed(), true); + + asyncErasureCoding.getErasureCodingPolicies(); + ErasureCodingPolicyInfo[] newErasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class); + int numNewECPolicies = newErasureCodingPolicies.length; + assertEquals(numECPolicies + 1, numNewECPolicies); + + asyncErasureCoding.getECTopologyResultForPolicies( + new String[]{"RS-6-3-1024k", "RS-12-4-1024k"}); + ECTopologyVerifierResult ecTopologyResultForPolicies = syncReturn(ECTopologyVerifierResult.class); + assertEquals(false, ecTopologyResultForPolicies.isSupported()); + + asyncErasureCoding.getECTopologyResultForPolicies( + new String[]{"XOR-2-1-1024k"}); + ECTopologyVerifierResult ecTopologyResultForPolicies1 = syncReturn(ECTopologyVerifierResult.class); + assertEquals(true, ecTopologyResultForPolicies1.isSupported()); + } +} From d633f0466ec6f81c91910d28988d1b4da47e6065 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 10 Oct 2024 09:50:26 +0800 Subject: [PATCH 5/8] fix checkstyles. --- .../federation/router/AsyncErasureCoding.java | 12 ++++++---- .../router/TestRouterAsyncErasureCoding.java | 24 +++++++++---------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index e4d34ecfb29e1..9cff67c6ef37c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -64,7 +64,8 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies() rpcClient.invokeConcurrent( nss, method, true, false, ErasureCodingPolicyInfo[].class); asyncApply( - (ApplyFunction, ErasureCodingPolicyInfo[]>) + (ApplyFunction, + ErasureCodingPolicyInfo[]>) ret -> merge(ret, ErasureCodingPolicyInfo.class)); return asyncReturn(ErasureCodingPolicyInfo[].class); @@ -80,7 +81,8 @@ public Map getErasureCodingCodecs() throws IOException { rpcClient.invokeConcurrent( nss, method, true, false, Map.class); - asyncApply((ApplyFunction>, Map>) retCodecs -> { + asyncApply((ApplyFunction>, Map>) retCodecs -> { Map ret = new HashMap<>(); Object obj = retCodecs; @SuppressWarnings("unchecked") @@ -148,7 +150,8 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies( rpcClient.invokeConcurrent(nss, method, true, false, ECTopologyVerifierResult.class); - asyncApply((ApplyFunction, ECTopologyVerifierResult>) ret -> { + asyncApply((ApplyFunction, + ECTopologyVerifierResult>) ret -> { for (Map.Entry entry : ret .entrySet()) { if (!entry.getValue().isSupported()) { @@ -170,7 +173,8 @@ public ECBlockGroupStats getECBlockGroupStats() throws IOException { rpcClient.invokeConcurrent( nss, method, true, false, ECBlockGroupStats.class); - asyncApply((ApplyFunction, ECBlockGroupStats>) allStats -> { + asyncApply((ApplyFunction, + ECBlockGroupStats>) allStats -> { return ECBlockGroupStats.merge(allStats.values()); }); return asyncReturn(ECBlockGroupStats.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java index 0d1acfa95e790..047cf6bdb55ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncErasureCoding.java @@ -66,7 +66,7 @@ public class TestRouterAsyncErasureCoding { private FileSystem routerFs; private RouterRpcServer routerRpcServer; private AsyncErasureCoding asyncErasureCoding; - + private final String testfilePath = "/testdir/testAsyncErasureCoding.file"; @BeforeClass @@ -159,11 +159,11 @@ public void testRouterAsyncErasureCoding() throws Exception { asyncErasureCoding.setErasureCodingPolicy("/testdir", ecPolicyName); syncReturn(null); - + asyncErasureCoding.getErasureCodingPolicy("/testdir"); ErasureCodingPolicy ecPolicy = syncReturn(ErasureCodingPolicy.class); assertEquals(StripedFileTestUtil.getDefaultECPolicy().getName(), ecPolicy.getName()); - + asyncErasureCoding.getErasureCodingPolicies(); ErasureCodingPolicyInfo[] erasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class); int numECPolicies = erasureCodingPolicies.length; @@ -171,14 +171,14 @@ public void testRouterAsyncErasureCoding() throws Exception { cluster.getNamenodes().get(0).getClient().getErasureCodingPolicies(); assertArrayEquals(erasureCodingPoliciesFromNameNode, erasureCodingPolicies); - + asyncErasureCoding.getErasureCodingCodecs(); - Map erasureCodingCodecs = syncReturn(Map.class); + Map erasureCodingCodecs = syncReturn(Map.class); Map erasureCodingCodecsFromNameNode = cluster.getNamenodes().get(0).getClient().getErasureCodingCodecs(); assertEquals(erasureCodingCodecs, erasureCodingCodecsFromNameNode); - + // RS-12-4-1024k final ECSchema schema = new ECSchema("rs", 12, 4); ErasureCodingPolicy erasureCodingPolicy = new ErasureCodingPolicy(schema, 1024 * 1024); @@ -187,18 +187,18 @@ public void testRouterAsyncErasureCoding() throws Exception { assertEquals(response[0].isSucceed(), true); asyncErasureCoding.getErasureCodingPolicies(); - ErasureCodingPolicyInfo[] newErasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class); - int numNewECPolicies = newErasureCodingPolicies.length; + ErasureCodingPolicyInfo[] erasureCodingPolicies2 = syncReturn(ErasureCodingPolicyInfo[].class); + int numNewECPolicies = erasureCodingPolicies2.length; assertEquals(numECPolicies + 1, numNewECPolicies); asyncErasureCoding.getECTopologyResultForPolicies( new String[]{"RS-6-3-1024k", "RS-12-4-1024k"}); - ECTopologyVerifierResult ecTopologyResultForPolicies = syncReturn(ECTopologyVerifierResult.class); - assertEquals(false, ecTopologyResultForPolicies.isSupported()); + ECTopologyVerifierResult ecTResultForPolicies = syncReturn(ECTopologyVerifierResult.class); + assertEquals(false, ecTResultForPolicies.isSupported()); asyncErasureCoding.getECTopologyResultForPolicies( new String[]{"XOR-2-1-1024k"}); - ECTopologyVerifierResult ecTopologyResultForPolicies1 = syncReturn(ECTopologyVerifierResult.class); - assertEquals(true, ecTopologyResultForPolicies1.isSupported()); + ECTopologyVerifierResult ecTResultForPolicies2 = syncReturn(ECTopologyVerifierResult.class); + assertEquals(true, ecTResultForPolicies2.isSupported()); } } From 87979c502df96f9ae97c67bba06e07927b6bb47f Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 10 Oct 2024 14:55:15 +0800 Subject: [PATCH 6/8] fix checkstyle indent --- .../federation/router/AsyncErasureCoding.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index 9cff67c6ef37c..5f26039ea7e68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -65,8 +65,7 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies() nss, method, true, false, ErasureCodingPolicyInfo[].class); asyncApply( (ApplyFunction, - ErasureCodingPolicyInfo[]>) - ret -> merge(ret, ErasureCodingPolicyInfo.class)); + ErasureCodingPolicyInfo[]>) ret -> merge(ret, ErasureCodingPolicyInfo.class)); return asyncReturn(ErasureCodingPolicyInfo[].class); } @@ -83,17 +82,17 @@ public Map getErasureCodingCodecs() throws IOException { asyncApply((ApplyFunction>, Map>) retCodecs -> { - Map ret = new HashMap<>(); - Object obj = retCodecs; - @SuppressWarnings("unchecked") - Map> results = - (Map>)obj; - Collection> allCodecs = results.values(); - for (Map codecs : allCodecs) { - ret.putAll(codecs); - } - return ret; - }); + Map ret = new HashMap<>(); + Object obj = retCodecs; + @SuppressWarnings("unchecked") + Map> results = + (Map>)obj; + Collection> allCodecs = results.values(); + for (Map codecs : allCodecs) { + ret.putAll(codecs); + } + return ret; + }); return asyncReturn(Map.class); } @@ -152,15 +151,15 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies( ECTopologyVerifierResult.class); asyncApply((ApplyFunction, ECTopologyVerifierResult>) ret -> { - for (Map.Entry entry : ret - .entrySet()) { - if (!entry.getValue().isSupported()) { - return entry.getValue(); - } - } - // If no negative result, return the result from the first namespace. - return ret.get(nss.iterator().next()); - }); + for (Map.Entry entry : + ret.entrySet()) { + if (!entry.getValue().isSupported()) { + return entry.getValue(); + } + } + // If no negative result, return the result from the first namespace. + return ret.get(nss.iterator().next()); + }); return asyncReturn(ECTopologyVerifierResult.class); } @@ -175,8 +174,8 @@ public ECBlockGroupStats getECBlockGroupStats() throws IOException { asyncApply((ApplyFunction, ECBlockGroupStats>) allStats -> { - return ECBlockGroupStats.merge(allStats.values()); - }); + return ECBlockGroupStats.merge(allStats.values()); + }); return asyncReturn(ECBlockGroupStats.class); } } \ No newline at end of file From f9edfec6e4ea6070de5697c818d0aca75555b348 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 10 Oct 2024 20:51:39 +0800 Subject: [PATCH 7/8] trigger yetus. From 648668401eac5ad798b575bb0d7c5415bb75411a Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 15 Oct 2024 17:37:16 +0800 Subject: [PATCH 8/8] fix extra line --- .../hadoop/hdfs/server/federation/router/AsyncErasureCoding.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java index 5f26039ea7e68..9f1dbe5f2c402 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncErasureCoding.java @@ -136,7 +136,6 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src) return asyncReturn(ErasureCodingPolicy.class); } - @Override public ECTopologyVerifierResult getECTopologyResultForPolicies( String[] policyNames) throws IOException {