-
Notifications
You must be signed in to change notification settings - Fork 9k
HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. #2732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -362,6 +362,7 @@ Command Line Options | |||
| `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | | | |||
| `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. | | |||
| `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store | | |||
| `-useIterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it implicitly void -numListstatusThreads? sounds like a bad new for running distcp on cloud storage where latency is big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanx @jojochuang for having a look.
Yes, It indeed isn't meant for object stores, I am trying a multi threaded approach for object stores too as part of HADOOP-17558, that won't be too much memory efficient, but still find a balance between speed and memory. I have a WIP patch for that as well, will share that on the jira
This is basically for HDFS or FS where listing is not slow, but there are memory constraints, my scenario is basically for DR, where it is in general HDFS->HDFS or HDFS->S3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was think we shouuld update the doc to mention it will disable -numListstatusThreads. But if we can merge that WIP patch soon then it's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the document, Let me know if something more can be improved.
} | ||
|
||
@SuppressWarnings("checkstyle:parameternumber") | ||
private void prepareListing(Path path, SequenceFile.Writer fileListWriter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for coming back late.
Can we refactor this method a bit to use fewer parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should refactor traverseDirectory() into a class since we pass over the parameters here and there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, Refactored TraverseDirectory
to a class
@@ -362,6 +362,7 @@ Command Line Options | |||
| `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | | | |||
| `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. | | |||
| `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store | | |||
| `-useIterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was think we shouuld update the doc to mention it will disable -numListstatusThreads. But if we can merge that WIP patch soon then it's fine.
Thanx @jojochuang for the review, I have addressed the review comments, Please have a look. :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added suggestions about
- logging duration of list
- logging IOSTats of iterator
- moving the test to the AbstractContractDistCpTest and so tested by the object stores (note: That test isn't executed by HDFS)
+ " target location, avoiding temporary file rename.")), | ||
|
||
USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR, | ||
new Option("useIterator", false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we have some non-mixed-case arg; I always get confused here?
@@ -18,6 +18,7 @@ | |||
|
|||
package org.apache.hadoop.tools; | |||
|
|||
import org.apache.hadoop.fs.RemoteIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to go into the "real hadoop imports" block; your IDE is getting confused. Putting it in the right place makes backporting waay easier
|
||
public void traverseDirectoryMultiThreaded() throws IOException { | ||
assert numListstatusThreads > 0; | ||
if (LOG.isDebugEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can go to slf4j logging here; this is all commons-logging era (distcp lagged)
if (workResult.getSuccess()) { | ||
LinkedList<CopyListingFileStatus> childCopyListingStatus = | ||
DistCpUtils.toCopyListingFileStatus(sourceFS, child, | ||
preserveAcls && child.isDirectory(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
child.isDirectory()
is called enough it could go into a variable
LOG.error("Could not get item from childQueue. Retrying..."); | ||
} | ||
} | ||
workers.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be in a finally clause?
prepareListing(child.getPath()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an IOStatisticsLogging.logIOStatisticsAtDebug(LOG, listStatus)
call here. That way at debug level you get a log from s3a, soon abfs of what IO took place for the list, performance etc. Really interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeps, that is something cool, I extracted a part of it to hadoop-common
, let me know if you have objections doing that, well I wanted to move whole of it to common
, just left it because of the class CallableSupplier
, I thought moving this might cause some incompatibility problems, as this was being used in the prod code as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy for you to take what's merged up. That CallableSupplier can be moved if you need to
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
Show resolved
Hide resolved
fs.mkdirs(source); | ||
// Create 10 dirs inside. | ||
for (int i = 0; i < 10; i++) { | ||
fs.mkdirs(new Path("/src/sub" + i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skip this and just delegate to the children; saves 10 calls
for (int k = 0; k < 10; k++) { | ||
Path parentPath = new Path("/src/sub" + i + "/subsub" + j); | ||
Path filePath = new Path(parentPath, "file" + k); | ||
DFSTestUtil.createFile(fs, filePath, 1024L, (short) 3, 1024L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, you can go straight to the createfile, without doing any mkdirs. Still going to take 10^3 calls on an object store though. If you do move something of this size there then
- the create files should be done in an executor pool (see ITestPartialRenamesDeletes.createDirsAndFiles())
- parameters should be something configurable, just a subclass getWidth() would be enough
|
||
// Check that all 1000 files got copied. | ||
RemoteIterator<LocatedFileStatus> destFileItr = fs.listFiles(dest, true); | ||
int numFiles = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
.describedAs("files").hasSize(...)
that way: if the size isn't met, the error includes the list of all files which were found.
Thanx @steveloughran for the review. I have addressed the review comments. Please have a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some minor comments. Have you had a chance to test the s3a distcp client through this yet?
import org.apache.hadoop.fs.FileUtil; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recent trunk changes #2522 will have broken this; just use direct references to BlockingThreadPoolExecutorService
@@ -64,12 +71,22 @@ | |||
|
|||
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; | |||
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; | |||
import org.slf4j.LoggerFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably needs to go somewhere else in the imports
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; | ||
import org.apache.hadoop.io.SequenceFile; | ||
import org.apache.hadoop.io.IOUtils; | ||
import org.apache.hadoop.io.Text; | ||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now, these imports we are trying to leave up where they were. Because when cherrypicking we're trying to stay on the older versions. it's a PITA, I know
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
Show resolved
Hide resolved
prepareListing(child.getPath()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy for you to take what's merged up. That CallableSupplier can be moved if you need to
...hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
Show resolved
Hide resolved
...hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
Outdated
Show resolved
Hide resolved
Thanx @steveloughran for the review, I have addressed the comments. Please give a check. |
d9779f6
to
9d9a4f7
Compare
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
Have removed the timeout from Created a test PR without any changes, Still The recent test failure isn't related. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, last failure is an OOM, so unrelated
@@ -53,8 +53,10 @@ | |||
import org.apache.hadoop.fs.FileSystem; | |||
import org.apache.hadoop.fs.FileUtil; | |||
import org.apache.hadoop.fs.Path; | |||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; | |||
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these need to go into the "other imports" block. Yes, it's a PITA, but its goal is to manage backporting. And guess what I have to do a lot of?
codewise, all looks good and tests seem spurious. I think what I'd like to do now is checkout and tests the s3a and abfs distp suites on my system |
S3A Test failure
|
And abfs
So -1 on the tests there. And ideally, it's time to see if you can sort out some AWS and/or azure credentials. Even testing against a minio docker image would be a good initial starting point |
Thanx @steveloughran for trying that out. I will figure out a way to run that UT. I will sort out the cred stuff and try following this doc, let me know if this isn't the best or correct doc to follow. I added a HDFS contract test as well and that fetched me a same exception as S3A:
I fixed it, So I suppose S3A should work, But the ABFS stuff I need to check, I feel it doesn't have a filesystem check ( |
Hey, I've realised that my auditing PR #2675 is going to clash on CallableSupplier changes as its taking auditing spans in, activating them before an after. Can you restore the S3A callable stuff, alongside the copy you've made in fs.impl? That will stop the PRs conflicting. Thanks |
Do you mean to say revert all the aws changes? and move these classes back to the aws module? To a state before this commit: That shouldn't be a problem, just to know will it be OK if I keep a parent class in the Hadoop-Common and child class named |
I'm wrapping each of the ops in an enter/exit of auditing, the changes are pretty traumatic. Unless I can rework how the invocation happens, we'll need to keep them separate. Here's what I'd like to propose
Your patch can go in to trunk and I can co-exist my dev with it. If I can see a way to move I'll adopt, but it will allow us to diverge, with the hadoop common CallableSupplier more broadly used |
The only reason we don't have one of those already is that it slowed down HDFS test runs and all the other distcp tests used mini HDFS clusters. But we clearly need it to regression test things Maybe we should add it, but give it a different name from Test*, (and comment in the parent class) so that its only run when explicitly asked for. Some of the S3 tests are like that. Test cases you have to run by hand or from the IDE -but which ma 10000 ven skips |
This comment has been minimized.
This comment has been minimized.
🎊 +1 overall
This message was automatically generated. |
Thanx @steveloughran for the review.
Region: ap-south-1 The newly added HDFS contract:
|
@steveloughran any further comments? |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
💔 -1 overall
This message was automatically generated. |
LGTM. I'm going to make one final change, but +1 this PR pending that change anyway. Can you put the CommonCallableSupplier into Thats where I'm trying to unify the API for functional APIs in hadoop with IOE support -and this is clearly part of it. Nothing else, just a move of the class. +1 pending that change. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Thanx @steveloughran for the review. I have moved |
OK, +1 from me. Merge to trunk and after a test run to branch-3.3; lets wait and see what surprises surface there |
#2808). Contributed by Ayush Saxena. * HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). * HADOOP-17531.Addendum: DistCp: Reduce memory usage on copying huge directories. (#2820) Signed-off-by: Steve Loughran <stevel@apache.org>
apache#2732). Contributed by Ayush Saxena. Signed-off-by: Steve Loughran <stevel@apache.org>
… directories. (apache#2808). Contributed by Ayush Saxena. * HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (apache#2732). * HADOOP-17531.Addendum: DistCp: Reduce memory usage on copying huge directories. (apache#2820) Signed-off-by: Steve Loughran <stevel@apache.org> Conflicts: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java (cherry picked from commit d86f94d18bd8b33cfc324b5638f12d9018c95d29) Signed-off-by: Arpit Agarwal <aagarwal@cloudera.com> Change-Id: Ieec8dbd96444dead3cd115f076a65444ca212a35
https://issues.apache.org/jira/browse/HADOOP-17531