-
Notifications
You must be signed in to change notification settings - Fork 9k
MAPREDUCE-7403. manifest-committer dynamic partitioning support. #4728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAPREDUCE-7403. manifest-committer dynamic partitioning support. #4728
Conversation
Declares its compatibility with the stream capability "mapreduce.job.committer.dynamic.partitioning" spark will need to cast to StreamCapabilities and then probe. Change-Id: Iafcacc6d2491bb1e7fc2fc033c6d17d5b63b5b4f
…through Change-Id: Icc30bf6251977cfb76211bffcfc5796b1a44989b
* spark-side requirements * why there is risk if you use it at scale. That risk is low because currently spark seems to rename sequentially. if/when it does parallel file rename then throttling may be triggered, with the consequential failure events. Change-Id: I6e442bbdcaa007a3cd2e04ddf8b41d14c51057ff
f62db61
to
82372d0
Compare
Change-Id: I423f052ca48915502f182cb4f1c67cdf04838a99
🎊 +1 overall
This message was automatically generated. |
would be good for some reviews here from @mukund-thakur , @mehakmeet and ideally @sunchao and @dongjoon-hyun -both of whom will be able to review the matching spark-side change, which is simply one of "don't reject attempts to use a PathOutputCommitter for dynamic partition overwrite if the instance created says it is OK" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new assertions TestManifestCommitProtocol.java
are just defined but not executed. Otherwise LGTM.
Assertions.assertThat(committer.hasCapability( | ||
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) | ||
.describedAs("dynamic partitioning capability in committer %s", | ||
committer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committer); | |
committer).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did i just get my asserts wrong. that was bad. thanks!
Assertions.assertThat(bindingCommitter.hasCapability( | ||
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING)) | ||
.describedAs("dynamic partitioning capability in committer %s", | ||
bindingCommitter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bindingCommitter); | |
bindingCommitter).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1,
pending Attila's comments.
Change-Id: I29e98cf4ac607913d59e15babe6180434f665714
thanks. fixed tests, ran locally, and ran the abfs ITest subclass. all good |
@@ -29,7 +29,7 @@ | |||
* <li>Nothing else got through either.</li> | |||
* </ol> | |||
*/ | |||
public class AWSStatus500Exception extends AWSServiceIOException { | |||
public class jAWSStatus500Exception extends AWSServiceIOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a typo in Intellij.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think Yetus failed beacuse of this only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah
💔 -1 overall
This message was automatically generated. |
Change-Id: Ifbe2d1012cbdf2e7467ce84a7d8d93a78e91dcf6
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Declares its compatibility with Spark's dynamic output partitioning by having the stream capability "mapreduce.job.committer.dynamic.partitioning" Requires a Spark release with SPARK-40034, which does the probing before deciding whether to accept/rejecting instantiation with dynamic partition overwrite set This feature can be declared as supported by any other PathOutputCommitter implementations whose algorithm and destination filesystem are compatible. None of the S3A committers are compatible. The classic FileOutputCommitter is, but it does not declare itself as such out of our fear of changing that code. The Spark-side code will automatically infer compatibility if the created committer is of that class or a subclass. Contributed by Steve Loughran.
### What changes were proposed in this pull request? Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. ### Why are the changes needed? Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores). The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it. Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so. (apache/hadoop#4728) ### Does thi 9E88 s PR introduce _any_ user-facing change? No. There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs. ### How was this patch tested? 1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change. 2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility Those new integration tests include * spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212) * Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151) Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites) Closes #37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. ### Why are the changes needed? Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores). The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it. Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so. (apache/hadoop#4728) ### Does this PR introduce _any_ user-facing change? No. There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs. ### How was this patch tested? 1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change. 2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility Those new integration tests include * spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212) * Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151) Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites) Closes #37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…che#4728) Declares its compatibility with Spark's dynamic output partitioning by having the stream capability "mapreduce.job.committer.dynamic.partitioning" Requires a Spark release with SPARK-40034, which does the probing before deciding whether to accept/rejecting instantiation with dynamic partition overwrite set This feature can be declared as supported by any other PathOutputCommitter implementations whose algorithm and destination filesystem are compatible. None of the S3A committers are compatible. The classic FileOutputCommitter is, but it does not declare itself as such out of our fear of changing that code. The Spark-side code will automatically infer compatibility if the created committer is of that class or a subclass. Contributed by Steve Loughran.
### What changes were proposed in this pull request? Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. ### Why are the changes needed? Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores). The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it. Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so. (apache/hadoop#4728) ### Does this PR introduce _any_ user-facing change? No. There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs. ### How was this patch tested? 1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change. 2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility Those new integration tests include * spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212) * Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151) Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites) Closes #37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. ### Why are the changes needed? Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores). The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it. Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so. (apache/hadoop#4728) ### Does this PR introduce _any_ user-facing change? No. There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs. ### How was this patch tested? 1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change. 2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility Those new integration tests include * spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212) * Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151) Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites) Closes #37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. This patch has unit tests but not integration tests; really needs to test the SQL commands through the manifest committer into gcs/abfs, or at least local fs. That would be possible once hadoop 3.3.5 is out... Uses the StreamCapabilities probe in MAPREDUCE-7403 to identify when a PathOutputCommitter is compatible with dynamic partition overwrite. Hadoop 3.3.5 adds a new committer in mapreduce-core which works fast and correctly on azure and gcs. (it would also work on hdfs, but its optimised for the cloud stores). The stores and the committer do meet the requirements of Spark SQL Dynamic Partition Overwrite, so it is OK to for spark to work through it. Spark does not know this; MAPREDUCE-7403 adds a way for any PathOutputCommitter to declare that they are compatible; the IntermediateManifestCommitter will do so. (apache/hadoop#4728) No. There is documentation on the feature in the hadoop [manifest committer](https://github.com/apache/hadoop/blob/82372d0d22e696643ad97490bc902fb6d17a6382/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) docs. 1. Unit tests in hadoop-cloud which work with hadoop versions with/without the matching change. 2. New integration tests in https://github.com/hortonworks-spark/cloud-integration which require spark to be built against hadoop with the manifest committer declaring compatibility Those new integration tests include * spark sql test derived from spark's own [CloudRelationBasicSuite.scala#L212](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/org/apache/spark/sql/sources/CloudRelationBasicSuite.scala#L212) * Dataset tests extended to verify support for/rejection of dynamic partition overwrite [AbstractCommitDataframeSuite.scala#L151](https://github.com/hortonworks-spark/cloud-integration/blob/master/cloud-examples/src/test/scala/com/cloudera/spark/cloud/committers/AbstractCommitDataframeSuite.scala#L151) Tested against azure cardiff with the manifest committer; s3 london (s3a committers reject dynamic partition overwrites) Closes apache#37468 from steveloughran/SPARK-40034-MAPREDUCE-7403-manifest-committer-partitioning. Authored-by: Steve Loughran <stevel@cloudera.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> Change-Id: Ibd45ef53f828acf39ceeaaea81f7f149a0eb6f23
Description of PR
Declares its compatibility with the stream capability
"mapreduce.job.committer.dynamic.partitioning"
spark will need to cast to StreamCapabilities and then probe.
How was this patch tested?
SPARK-40034 has a PR with patch matching changes in the spark code; plus unit tests to verify that it's not an error to ask for dynamic partition if the committer's hasCapability holds.
apache/spark#37468
Testing
all the abfs tests against azure cardiff. one transient failure; one new JIRA (HADOOP-18405
abfs testReadAndWriteWithDifferentBufferSizesAndSeek failure)
unit tests of the spark code in to check for the capability and reject if missing CommitterBindingSuite.scala#L162
new integration tests in https://github.com/hortonworks-spark/cloud-integration
those new integration tests include
Tested through a spark build with the matching patch against s3 london, azure cardiff.
GCS test setup failing with oauth problems the way they were not on friday. assuming unrelated.
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?