-
Notifications
You must be signed in to change notification settings - Fork 9.1k
MAPREDUCE-7341. Intermediate Manifest Committer #2971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAPREDUCE-7341. Intermediate Manifest Committer #2971
Conversation
4631327
to
7d65947
Compare
5e8cdd3
to
4f34112
Compare
7872e5f
to
efad653
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through the prod code. Looks really promising.
And glad to see the usage of state design pattern ( not exactly but similar)
.../src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbortTaskStage.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/CommitJobStage.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractJobCommitStage.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
Show resolved
Hide resolved
efad653
to
3f07572
Compare
OK. I'm going to say "sorry, no" to the idea of using diff to validate JSON files; think a bit about dest file validation. JSON is there to be parsed, the bundled diagnostics and iostats change, and the file paths will between local, abfs and gcs. the way to validate it is to read it in and make assertions on it. Alongside this PR, i have a private fork of google gcs which subclasses all the tests and runs them against google cloud and end to end test through spark standalone these tests verify the committer works for dataframe, and spark sql for orc/parquet and csv these tests are loading and validating the success file (and its truncated list of generated files) with the filesystem this is all an evolution of the existing suites for the s3a committers -which is where the success file came from. I would rather do the detailed test here as they are full integration tests. It is fairly tricky to get them building however; takes an hour+ for a full compile, which needs to be repeated every morning (-SNAPSHOT artifacts, see). what i can do in the hadoop tests is add a test to load a success file and validate it against the output, and that there are no unknown files there. i'd love some suggestions as improvements to the spark ones too. it's a mix of my own and some I moved from the apache spark sql suites and reworked to be targetable at different filesystems. one thing i don't test there is writing data over existing files in a complex partition tree...i should do that, which i can do after this patch is in... |
* properly marshall filenames with spaces. this is only for testing, but as our test paths have spaces in.. * remove all remaining references to move to trash in committer code and docs. Change-Id: I3098c175d386de6f9768b08f7399af0de075b17e
logs even on the normal route, so that we can see when that is still be picked up. Change-Id: Ia022beb0131720c105110ab4334ba1627d6e6bb6
just pushed an update with
there's a hardcoded limit on the number of files which get listed in that success data (100), so that on big jobs the time to write the success file doesn't itself slow the job down. is that too big a number? as if paths are long you could still have 50 kib of data or more. tested azure cardiff. |
Change-Id: I4a111f862c1ea725367c28b6c74377fec658824b
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Show resolved
Hide resolved
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...uce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
Outdated
Show resolved
Hide resolved
...java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java
Outdated
Show resolved
Hide resolved
/** | ||
* Extension of StoreOperationsThroughFileSystem with ABFS awareness. | ||
* Purely for use by jobs committing work through the manifest committer. | ||
* The {@link AzureManifestCommitterFactory} will configure the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: will configure twice.
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
Show resolved
Hide resolved
* Constructor. | ||
* @param capacity capacity in permits/second. | ||
*/ | ||
private RestrictedRateLimiting(int capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the name to maxCapacity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
< 8000 /form>i will call it capacityPerSecond
to make it clear it uses time as part of the process
...p-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java
Outdated
Show resolved
Hide resolved
*/ | ||
@InterfaceAudience.Private | ||
@InterfaceStability.Unstable | ||
public interface RateLimiting { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to experiment/test more with test before moving for all operations in FS itself.
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
Outdated
Show resolved
Hide resolved
--> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scope "provided" means used to compile but not included in the dependency list of the published artifact
there are no new runtime dependencies unless you want to use the manifest committer
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> | ||
<scope>test</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does mean that abfs-test jar does need it on the classpath.
if i declare this as provided, that will not be the case.
but it will then be a compile time dependency of the production code
<type>test-jar</type> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scope is test only. we have lots of other test scope only dependencies already and it is not an issue
note also the hadoop-azure-test jar will not publish any of its transitive dependencies at all. so nobody will notice. which is good as we already pull in things like distcp
all trivial changes and mostly markdown and javadocs. Change-Id: Ideec4e63d345b0ecb967aa75da8a82c1ff01ccda
I have addressed all the little nits in the code -thank you for reviewing the text and Java docs so thoroughly. I have also improved that rejection of filesystem by schema, adding wasb to the set of unsupported stores, Once we add a better rename api to the filesystem/filecontext, we can add path capabilities for renames I have also tried to clarify in the Java docs and comments places where there was ambiguity. This includes comments in the hadoop-azure pom where the references to the mapreduce jars are imported with scopes of Everyone reviewing this patch needs to understand what these scopes mean, so they will understand why I have no intention of changing those declarations or adding anything to hadoop-common.
It is not a requirement to use the file system, nor is it exported as a new dependency. I am 100% confident of this because these same dependencies were added to hadoop-aws in HADOOP-13786, Add S3A committers for zero-rename commits to S3 endpoints -and nobody has ever reported the filesystem not instantiating. note, cloudstore storediag doesn't put these dependencies on the cp when invoked via tested: azure cardiff |
use of set code upset checkstyle. lets try again |
Change-Id: I963fc4ddff55e896a4c0f806ef4336bee3a2c0c7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM
I have reviewed the main algorithm, all stage execution and abfs changes. Impressive stage wise
implementation and mainifest committer integration.
Ready to go in with following things to track for future:
- There are pending questions from others. I have tried to clarify some of them based on my understanding.
2)We made the RateLimiting feature experimental as it is in initial phase. Will require more testing from QE and performance team.
-
Not moving the ResilientCommitByRename to hadoop-common as that will make it a public and require commitment for maintenance
in future. We plan to add new rename builder public api which can take parameters like etag etc. -
Haven't reviewed the iostats integration in detail but overall looks nice to see so many stats added related to committer.
-
TaskPool is already reviewed old code. So not reviewing it right now because of less time.
-
Haven't been able to review test code.
-
CreateOutputDirectoriesStage seemed a bit complex to me. Maybe we can do session to understand that later.
|
||
/** | ||
* Callback on stage entry. | ||
* Sets the sactiveStage and updates the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : typo sactiveStage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, fixed
// update the manifest list in a synchronized block. | ||
|
||
synchronized (manifests) { | ||
manifests.add(m); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to notice here is manifests can grow in size and cause OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surfaced on s3a committer only on multi TB terasorts...but there each file included a list of etags, so was much bigger
merging all manifest dir lists lets us optimise dir creation; to do that and stil do independent manifest committing of work would require double loading of manifests, one for dir setup and another for renaming. I did start off with some parallel work here but it is both complex (need two thread pools to avoid deadlocks) and didn't seem tangibly more efficient
if it surfaces in real world jobs then i can worry about it.
leaves.put(path, entry); | ||
|
||
// if it is a file to delete, record this. | ||
if (entry.getStatus() == EntryStatus.file) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will this case happen? as we are only getting the directories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that list of dirs is built up in task commit, where each task checks the status of the equivalent dir in the dest path. so we know which target dirs have files, as well as which don't exist
OP_COMMIT_FILE_RENAME, () -> | ||
operations.renameFile(source, dest)); | ||
} | ||
return new CommitOutcome(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: CommitOutcome is empty only. Why not just void?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leave open for others
* @return status or null | ||
* @throws IOException IO Failure. | ||
*/ | ||
protected Boolean delete( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is just refactoring and delete is called from multiple Stages.
* @param <IN> Type of arguments to the stage. | ||
* @param <OUT> Type of result. | ||
*/ | ||
public abstract class AbstractJobCommitStage<IN, OUT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with name change as there are methods here which is getting called from multiple stages.
trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () -> | ||
TaskPool.foreach(manifestFiles) | ||
.executeWith(getIOProcessors()) | ||
.stopOnFailure() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think any exception in any thred will stop the ececution and will be thrown be from the method and finally a stage failure is reported.
// list the directory. This may block until the listing is complete, | ||
// or, if the FS does incremental or asynchronous fetching, until the | ||
// first page of results is ready. | ||
final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is async paged list request and non recursive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Some minor comments ran the whole Azure test suite with the new tests. took 2.5hr to complete, maybe I have to tune down the Configs in ITestAbfsTerasort for better performance?
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:39 h
[INFO] Finished at: 2022-03-16T13:50:48+05:30
[INFO] ------------------------------------------------------------------------
return result.getStatusCode() == HttpURLConnection.HTTP_OK | ||
&& sourceEtag.equals(extractEtagHeader(result)); | ||
} catch (AzureBlobFileSystemException ignored) { | ||
// GetFileStatus on the destination failed, the rename did not take place |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add Debug logs here for this scenario?
/** | ||
* Prepare the test configuration. | ||
* @param contractTestBinding test binding | ||
* @return an extraced and patched configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "extracted"
* the HDFS cluster binding is implicitly propagated to YARN. | ||
* If one is not requested, the local filesystem is used as the cluster FS. | ||
* @param conf configuration to start with. | ||
* @param useHDFS should an HDFS cluster be instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah, relic of the s3a code
} | ||
|
||
|
||
protected Job createJob(Configuration jobConf) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never use this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cut
.../apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java
Show resolved
Hide resolved
...test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
Outdated
Show resolved
Hide resolved
@mehakmeet thanks for your comments. make sure you aren't VPN'd to a vpn which redirects all azure/aws/gcs io, otherwise all your requests go round the world. i have hit this on GCS. |
renaming base stage class JobOrTaskStage, fix javadocs Change-Id: Icefbdfb81ca5b6f77640408d5ff395265444972a
A File output committer which uses a manifest file to pass the lists of directories to create and files to rename from task attempts to job committer; job commit does dir creation and file rename across a pool of threads. Based on lessons/code from the S3A committer,
It is faster than the V1 algorithm, does not require atomic/O(1) directory rename. It will be slower that v2 as files are renamed in job commit, but as it has prepared the lists of files to copy, directory creation and file rename (with deletes of destination artifacts) are all that is needed.
This committer works with any consistent filesystem/objects store for which file rename is fast. It does not perform any directory renames, so has no requirements there. It uses
listStatusIterator()
to walk the tree of files to commit. This makes it straightforward to also build up the list of dirs to create -but it does mean it will underperform against any store for which a recursivelistStatus(path, true)
is significantly faster. That is true for S3A, but this committer is not for use there. rename() takes too long.It is targeted at Apache Spark and does not support job recovery. There's no fundamental reason why this could not be added by a sufficiently motivated individual.
Status:
Features
Improvements
Tests
Functional tests are done on a par with s3a committers, want to add better fault injection (the design lines up for this nicely as we can test each stage in isolation), and some scale tests.
I'm adding spark integration tests in https://github.com/hortonworks-spark/cloud-integration ;already (july 2021) working
and on to stage conf. Paths valid etc.