8000 HDFS-16911. Distcp with snapshot diff to support Ozone filesystem. by sadanand48 · Pull Request #5364 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HDFS-16911. Distcp with snapshot diff to support Ozone filesystem. #5364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.tools.CopyListing.InvalidInputException;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -106,20 +107,7 @@ private boolean preSyncCheck() throws IOException {
final FileSystem snapshotDiffFs = isRdiff() ? tgtFs : srcFs;
final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir;

// currently we require both the source and the target file system are
// DistributedFileSystem or (S)WebHdfsFileSystem.
if (!(srcFs instanceof DistributedFileSystem
|| srcFs instanceof WebHdfsFileSystem)) {
throw new IllegalArgumentException("Unsupported source file system: "
+ srcFs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
if (!(tgtFs instanceof DistributedFileSystem
|| tgtFs instanceof WebHdfsFileSystem)) {
throw new IllegalArgumentException("Unsupported target file system: "
+ tgtFs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
checkFilesystemSupport(sourceDir,targetDir,srcFs, tgtFs);

// make sure targetFS has no change between from and the current states
if (!checkNoChange(tgtFs, targetDir)) {
Expand Down Expand Up @@ -165,6 +153,42 @@ private boolean preSyncCheck() throws IOException {
return true;
}

/**
* Check if the source and target filesystems support snapshots.
*/
private void checkFilesystemSupport(Path sourceDir, Path targetDir,
FileSystem srcFs, FileSystem tgtFs) throws IOException {
if (!srcFs.hasPathCapability(sourceDir,
CommonPathCapabilities.FS_SNAPSHOTS)) {
throw new UnsupportedOperationException(
"The source file system " + srcFs.getScheme()
+ " does not support snapshot.");
}
if (!tgtFs.hasPathCapability(targetDir,
CommonPathCapabilities.FS_SNAPSHOTS)) {
throw new UnsupportedOperationException(
"The target file system " + tgtFs.getScheme()
+ " does not support snapshot.");
}
try {
getSnapshotDiffReportMethod(srcFs);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"The source file system " + srcFs.getScheme()
+ " does not support getSnapshotDiffReport",
e);
}
try {
getSnapshotDiffReportMethod(tgtFs);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"The target file system " + tgtFs.getScheme()
+ " does not support getSnapshotDiffReport",
e);
}

}

public boolean sync() throws IOException {
if (!preSyncCheck()) {
return false;
Expand Down Expand Up @@ -211,21 +235,10 @@ private boolean getAllDiffs() throws IOException {
context.getTargetPath() : context.getSourcePaths().get(0);

try {
SnapshotDiffReport report = null;
FileSystem fs = ssDir.getFileSystem(conf);
final String from = getSnapshotName(context.getFromSnapshot());
final String to = getSnapshotName(context.getToSnapshot());
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
report = dfs.getSnapshotDiffReport(ssDir, from, to);
} else if (fs instanceof WebHdfsFileSystem) {
WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
report = webHdfs.getSnapshotDiffReport(ssDir, from, to);
} else {
throw new IllegalArgumentException("Unsupported file system: " +
fs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
SnapshotDiffReport report =
getSnapshotDiffReport(ssDir.getFileSystem(conf), ssDir, from, to);

this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
for (SnapshotDiffReport.DiffType type :
Expand Down Expand Up @@ -286,6 +299,36 @@ private boolean getAllDiffs() throws IOException {
return false;
}

/**
* Check if the filesystem implementation has a method named
* getSnapshotDiffReport.
*/
private static Method getSnapshotDiffReportMethod(FileSystem fs)
throws NoSuchMethodException {
return fs.getClass().getMethod(
"getSnapshotDiffReport", Path.class, String.class, String.class);
}

/**
* Get the snapshotDiff b/w the fromSnapshot & toSnapshot for the given
* filesystem.
*/
private static SnapshotDiffReport getSnapshotDiffReport(
final FileSystem fs,
final Path snapshotDir,
final String fromSnapshot,
final String toSnapshot) throws IOException {
try {
return (SnapshotDiffReport) getSnapshotDiffReportMethod(fs).invoke(
fs, snapshotDir, fromSnapshot, toSnapshot);
} catch (InvocationTargetException e) {
throw new IOException(e.getCause());
} catch (NoSuchMethodException|IllegalAccessException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoSuchMethod sanity check already covered looks like before calling this API. IllegalAccess should noyt happen ideally. So runtime should be ok.
Tiny Nit: Message: "Failed to ...."

Copy link
Contributor Author
@sadanand48 sadanand48 Mar 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @umamaheswararao for the review.

Done. Yes the exception will likely be detected earlier but these are checked exceptions so it requires either to define in catch block or throws.

throw new IllegalArgumentException(
"Failed to invoke getSnapshotDiffReport.", e);
}
}

private String getSnapshotName(String name) {
return Path.CUR_DIR.equals(name) ? "" : name;
}
Expand Down Expand Up @@ -327,14 +370,7 @@ private void deleteTargetTmpDir(FileSystem targetFs,
private boolean checkNoChange(FileSystem fs, Path path) {
try {
final String from = getSnapshotName(context.getFromSnapshot());
SnapshotDiffReport targetDiff = null;
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
targetDiff = dfs.getSnapshotDiffReport(path, from, "");
} else {
WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
targetDiff = webHdfs.getSnapshotDiffReport(path, from, "");
}
SnapshotDiffReport targetDiff = getSnapshotDiffReport(fs, path, from, "");
if (!targetDiff.getDiffList().isEmpty()) {
DistCp.LOG.warn("The target has been modified since snapshot "
+ context.getFromSnapshot());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
Expand All @@ -38,6 +40,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -47,6 +50,7 @@
import java.io.IOException;
import java.io.FileWriter;
import java.io.BufferedWriter;
import java.net.URI;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.ArrayList;
Expand All @@ -56,6 +60,9 @@
import java.util.Map;
import java.util.regex.Pattern;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class TestDistCpSync {
private MiniDFSCluster cluster;
private final Configuration conf = new HdfsConfiguration();
Expand Down Expand Up @@ -89,6 +96,7 @@ public void setUp() throws Exception {

conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
conf.setClass("fs.dummy.impl", DummyFs.class, FileSystem.class);
}

@After
Expand Down Expand Up @@ -1276,4 +1284,63 @@ private void snapshotDiffWithPaths(Path sourceFSPath,
verifyCopyByFs(sourceFS, targetFS, sourceFS.getFileStatus(sourceFSPath),
targetFS.getFileStatus(targetFSPath), false);
}

@Test
public void testSyncSnapshotDiffWithLocalFileSystem() throws Exception {
String[] args = new String[]{"-update", "-diff", "s1", "s2",
"file:///source", "file:///target"};
LambdaTestUtils.intercept(
UnsupportedOperationException.class,
"The source file system file does not support snapshot",
() -> new DistCp(conf, OptionsParser.parse(args)).execute());
}

@Test
public void testSy AE8F ncSnapshotDiffWithDummyFileSystem() {
String[] args =
new String[] { "-update", "-diff", "s1", "s2", "dummy:///source",
"dummy:///target" };
try {
FileSystem dummyFs = FileSystem.get(URI.create("dummy:///"), conf);
assertThat(dummyFs).isInstanceOf(DummyFs.class);
new DistCp(conf, OptionsParser.parse(args)).execute();
} catch (UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. catch UnsupportedOperationException after tightening the exception raised

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// can expect other exceptions as source and target paths
// are not created.
}
}

public static class DummyFs extends RawLocalFileSystem {
public DummyFs() {
super();
}

public URI getUri() {
return URI.create("dummy:///");
}

@Override
public boolean hasPathCapability(Path path, String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_SNAPSHOTS:
return true;
default:
return super.hasPathCapability(path, capability);
}
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
return new FileStatus();
}

public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
final String fromSnapshot, final String toSnapshot) {
return new SnapshotDiffReport(snapshotDir.getName(), fromSnapshot,
toSnapshot, new ArrayList<SnapshotDiffReport.DiffReportEntry>());
}
}
}
0