8000 MAPREDUCE-7341. Intermediate Manifest Committer by steveloughran · Pull Request #2971 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

MAPREDUCE-7341. Intermediate Manifest Committer #2971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

steveloughran
Copy link
Contributor
@steveloughran steveloughran commented May 3, 2021

A File output committer which uses a manifest file to pass the lists of directories to create and files to rename from task attempts to job committer; job commit does dir creation and file rename across a pool of threads. Based on lessons/code from the S3A committer,

It is faster than the V1 algorithm, does not require atomic/O(1) directory rename. It will be slower that v2 as files are renamed in job commit, but as it has prepared the lists of files to copy, directory creation and file rename (with deletes of destination artifacts) are all that is needed.

This committer works with any consistent filesystem/objects store for which file rename is fast. It does not perform any directory renames, so has no requirements there. It uses listStatusIterator() to walk the tree of files to commit. This makes it straightforward to also build up the list of dirs to create -but it does mean it will underperform against any store for which a recursive listStatus(path, true) is significantly faster. That is true for S3A, but this committer is not for use there. rename() takes too long.

It is targeted at Apache Spark and does not support job recovery. There's no fundamental reason why this could not be added by a sufficiently motivated individual.

Status:

  • file formats.
  • implementation architecture "stages you can chain" done
  • Fork of S3A ITest contract test done, with implementations for local FS and ABFS.
  • ABFS Terasort ITest
  • Basic spark tests with validation of _SUCCESS output

Features

  • build stage config from job/task config
  • directory prepare/cleanup (parallelised)
  • wire up committer
  • progress callbacks
  • delete all task attempts in parallel for performance/scale on gcs and abfs when oauth authenticated
  • Always save _SUCCESS results to a report dir, even on failure
  • each task attempt to save its preparation time to the iostats in task manifest; job stats to aggregate these for ease of measuring task commit performance.

Improvements

  • improve performance of PrepareDirectoriesStage
  • RateLimiting class in hadoop common to wrap guava one; hide from modules the origin of the limiter. This could automatically update IOStats source. (oh no, it'll need a test too...)

Tests

Functional tests are done on a par with s3a committers, want to add better fault injection (the design lines up for this nicely as we can test each stage in isolation), and some scale tests.

I'm adding spark integration tests in https://github.com/hortonworks-spark/cloud-integration ;already (july 2021) working

  • job/task conf to manifest committer conf
    and on to stage conf. Paths valid etc.
  • round trip of manifest
  • _SUCCESS
  • individual stages with large fake dir tree
  • directory merge
  • fail fast if job or task attempt directory exists
  • abstract protocol test
  • Source File missing on rename
  • Dest File is file/dir
  • Parent entry of a directory to create is actually a file.
  • other job overwrites parent dir with file
  • two jobs overwrite same file (will only surface in validate)
  • two task attempts writing to same dest
  • multi-task job commit with failure in one of the tasks while others are active
  • SPARK_WRITE_UUID picked up from job to _SUCCESS
  • overwrite an existing tree with new data
  • cleanup stage: rename to trash option
  • cleanup stage: parallel TA delete. Use iostats to count #of deletes, or return # in return value.
  • many task attempts in a task dir
  • wrong task ID in a loaded TA? actually, we don't check.
  • rate limited job commit on local fs with triggering

@apache apache deleted a comment from hadoop-yetus May 3, 2021
@apache apache deleted a comment from hadoop-yetus May 3, 2021
@apache apache deleted a comment from hadoop-yetus May 5, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 4631327 to 7d65947 Compare May 10, 2021 12:55
@apache apache deleted a comment from hadoop-yetus May 11, 2021
@apache apache deleted a comment from hadoop-yetus May 11, 2021
@apache apache deleted a comment from hadoop-yetus May 12, 2021
@steveloughran steveloughran marked this pull request as draft May 13, 2021 19:13
@steveloughran steveloughran added enhancement fs/azure changes related to azure; submitter must declare test endpoint MapReduce labels May 13, 2021
@apache apache deleted a comment from hadoop-yetus May 22, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 5e8cdd3 to 4f34112 Compare May 25, 2021 15:08
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@apache apache deleted a comment from hadoop-yetus Jun 1, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from 7872e5f to efad653 Compare June 5, 2021 11:41
Copy link
Contributor
@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went through the prod code. Looks really promising.
And glad to see the usage of state design pattern ( not exactly but similar)

@apache apache deleted a comment from hadoop-yetus Jun 8, 2021
@apache apache deleted a comment from hadoop-yetus Jun 8, 2021
@steveloughran steveloughran force-pushed the mr/MAPREDUCE-7341-manifest-committer branch from efad653 to 3f07572 Compare June 9, 2021 17:12
@steveloughran
Copy link
Contributor Author

OK.

I'm going to say "sorry, no" to the idea of using diff to validate JSON files; think a bit about dest file validation.

JSON is there to be parsed, the bundled diagnostics and iostats change, and the file paths will between local, abfs and gcs.

the way to validate it is to read it in and make assertions on it.

Alongside this PR, i have a private fork of google gcs which subclasses all the tests and runs them against google cloud

and end to end test through spark standalone
https://github.com/hortonworks-spark/cloud-integration

these tests verify the committer works for dataframe, and spark sql for orc/parquet and csv
https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/abfs/commit/AbfsCommitDataframeSuite.scala#L83
https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/hive/orc/abfs
https://github.com/hortonworks-spark/cloud-integration/tree/master/cloud-examples/src/test/scala/org/apache/spark/sql/hive/orc/gs

these tests are loading and validating the success file (and its truncated list of generated files) with the filesystem
https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/main/scala/com/cloudera/spark/cloud/s3/S3AOperations.scala#L54

this is all an evolution of the existing suites for the s3a committers -which is where the success file came from.

I would rather do the detailed test here as they are full integration tests. It is fairly tricky to get them building however; takes an hour+ for a full compile, which needs to be repeated every morning (-SNAPSHOT artifacts, see).

what i can do in the hadoop tests is add a test to load a success file and validate it against the output, and that there are no unknown files there.

i'd love some suggestions as improvements to the spark ones too. it's a mix of my own and some I moved from the apache spark sql suites and reworked to be targetable at different filesystems. one thing i don't test there is writing data over existing files in a complex partition tree...i should do that, which i can do after this patch is in...

* properly marshall filenames with spaces.
  this is only for testing, but as our test paths have spaces in..
* remove all remaining references to move to trash in committer code and docs.

Change-Id: I3098c175d386de6f9768b08f7399af0de075b17e
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
logs even on the normal route, so that we can see when that is
still be picked up.

Change-Id: Ia022beb0131720c105110ab4334ba1627d6e6bb6
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@apache apache deleted a comment from hadoop-yetus Mar 14, 2022
@steveloughran
Copy link
Contributor Author

just pushed an update with

  • PathOutputCommitter logs at factory
  • a bit more vaildation of the manifest summary data (which showed the testing-only-path list wasn't marshalling spaces properly, a bug which must still be in the s3a code)

there's a hardcoded limit on the number of files which get listed in that success data (100), so that on big jobs the time to write the success file doesn't itself slow the job down.

is that too big a number? as if paths are long you could still have 50 kib of data or more.
could cut down to something minimal, like, say, 20. enough for basic tests but not for performance issues.

tested azure cardiff.

8000 @apache apache deleted a comment from hadoop-yetus Mar 15, 2022
Change-Id: I4a111f862c1ea725367c28b6c74377fec658824b
/**
* Extension of StoreOperationsThroughFileSystem with ABFS awareness.
* Purely for use by jobs committing work through the manifest committer.
* The {@link AzureManifestCommitterFactory} will configure the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: will configure twice.

* Constructor.
* @param capacity capacity in permits/second.
*/
private RestrictedRateLimiting(int capacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the name to maxCapacity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< 8000 /form>

i will call it capacityPerSecond to make it clear it uses time as part of the process

*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface RateLimiting {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to experiment/test more with test before moving for all operations in FS itself.

-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope "provided" means used to compile but not included in the dependency list of the published artifact

there are no new runtime dependencies unless you want to use the manifest committer

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>test</scope>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does mean that abfs-test jar does need it on the classpath.

if i declare this as provided, that will not be the case.

but it will then be a compile time dependency of the production code

<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scope is test only. we have lots of other test scope only dependencies already and it is not an issue

note also the hadoop-azure-test jar will not publish any of its transitive dependencies at all. so nobody will notice. which is good as we already pull in things like distcp

@apache apache deleted a comment from hadoop-yetus Mar 15, 2022
all trivial changes and mostly markdown and javadocs.

Change-Id: Ideec4e63d345b0ecb967aa75da8a82c1ff01ccda
@steveloughran
Copy link
Contributor Author
steveloughran commented Mar 15, 2022

@mukund-thakur,

I have addressed all the little nits in the code -thank you for reviewing the text and Java docs so thoroughly.

I have also improved that rejection of filesystem by schema, adding wasb to the set of unsupported stores,
and a test for it.

Once we add a better rename api to the filesystem/filecontext, we can add path capabilities for renames
to probe for.

I have also tried to clarify in the Java docs and comments places where there was ambiguity.

This includes comments in the hadoop-azure pom where the references to the mapreduce jars are imported with scopes of provided and test.

Everyone reviewing this patch needs to understand what these scopes mean, so they will understand why I have no intention of changing those declarations or adding anything to hadoop-common.

scope name classpath of transitive?
provided src/main and src/test builds no
test src/test builds no

It is not a requirement to use the file system, nor is it exported as a new dependency. I am 100% confident of this because these same dependencies were added to hadoop-aws in HADOOP-13786, Add S3A committers for zero-rename commits to S3 endpoints -and nobody has ever reported the filesystem not instantiating.

note, cloudstore storediag doesn't put these dependencies on the cp when invoked via hadoop jar cloudstore.jar storediag so verifying my statements is straightforward.

tested: azure cardiff

@steveloughran
Copy link
Contributor Author

use of set code upset checkstyle. lets try again

Uh oh!

There was an error while loading. Please reload this page.

Change-Id: I963fc4ddff55e896a4c0f806ef4336bee3a2c0c7
@apache apache deleted a comment from hadoop-yetus Mar 15, 2022
@apache apache deleted a comment from hadoop-yetus Mar 15, 2022
Copy link
Contributor
@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM
I have reviewed the main algorithm, all stage execution and abfs changes. Impressive stage wise
implementation and mainifest committer integration.
Ready to go in with following things to track for future:

  1. There are pending questions from others. I have tried to clarify some of them based on my understanding.

2)We made the RateLimiting feature experimental as it is in initial phase. Will require more testing from QE and performance team.

  1. Not moving the ResilientCommitByRename to hadoop-common as that will make it a public and require commitment for maintenance
    in future. We plan to add new rename builder public api which can take parameters like etag etc.

  2. Haven't reviewed the iostats integration in detail but overall looks nice to see so many stats added related to committer.

  3. TaskPool is already reviewed old code. So not reviewing it right now because of less time.

  4. Haven't been able to review test code.

  5. CreateOutputDirectoriesStage seemed a bit complex to me. Maybe we can do session to understand that later.


/**
* Callback on stage entry.
* Sets the sactiveStage and updates the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : typo sactiveStage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed

// update the manifest list in a synchronized block.

synchronized (manifests) {
manifests.add(m);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to notice here is manifests can grow in size and cause OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surfaced on s3a committer only on multi TB terasorts...but there each file included a list of etags, so was much bigger

merging all manifest dir lists lets us optimise dir creation; to do that and stil do independent manifest committing of work would require double loading of manifests, one for dir setup and another for renaming. I did start off with some parallel work here but it is both complex (need two thread pools to avoid deadlocks) and didn't seem tangibly more efficient

if it surfaces in real world jobs then i can worry about it.

leaves.put(path, entry);

// if it is a file to delete, record this.
if (entry.getStatus() == EntryStatus.file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this case happen? as we are only getting the directories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that list of dirs is built up in task commit, where each task checks the status of the equivalent dir in the dest path. so we know which target dirs have files, as well as which don't exist

OP_COMMIT_FILE_RENAME, () ->
operations.renameFile(source, dest));
}
return new CommitOutcome();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CommitOutcome is empty only. Why not just void?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave open for others

* @return status or null
* @throws IOException IO Failure.
*/
protected Boolean delete(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just refactoring and delete is called from multiple Stages.

* @param <IN> Type of arguments to the stage.
* @param <OUT> Type of result.
*/
public abstract class AbstractJobCommitStage<IN, OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with name change as there are methods here which is getting called from multiple stages.

trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
TaskPool.foreach(manifestFiles)
.executeWith(getIOProcessors())
.stopOnFailure()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any exception in any thred will stop the ececution and will be thrown be from the method and finally a stage failure is reported.

// list the directory. This may block until the listing is complete,
// or, if the FS does incremental or asynchronous fetching, until the
// first page of results is ready.
final RemoteIterator<FileStatus> listing = listStatusIterator(srcDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is async paged list request and non recursive.

Copy link
Contributor
@mehakmeet mehakmeet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Some minor comments ran the whole Azure test suite with the new tests. took 2.5hr to complete, maybe I have to tune down the Configs in ITestAbfsTerasort for better performance?

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:39 h
[INFO] Finished at: 2022-03-16T13:50:48+05:30
[INFO] ------------------------------------------------------------------------

return result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
} catch (AzureBlobFileSystemException ignored) {
// GetFileStatus on the destination failed, the rename did not take place
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add Debug logs here for this scenario?

/**
* Prepare the test configuration.
* @param contractTestBinding test binding
* @return an extraced and patched configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "extracted"

* the HDFS cluster binding is implicitly propagated to YARN.
* If one is not requested, the local filesystem is used as the cluster FS.
* @param conf configuration to start with.
* @param useHDFS should an HDFS cluster be instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, relic of the s3a code

}


protected Job createJob(Configuration jobConf) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never use this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cut

@steveloughran
Copy link
Contributor Author

@mehakmeet thanks for your comments. make sure you aren't VPN'd to a vpn which redirects all azure/aws/gcs io, otherwise all your requests go round the world. i have hit this on GCS.

@apache apache deleted a comment from hadoop-yetus Mar 16, 2022
renaming base stage class JobOrTaskStage, fix javadocs

Change-Id: Icefbdfb81ca5b6f77640408d5ff395265444972a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement fs/azure changes related to azure; submitter must declare test endpoint MapReduce
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants
0