8000 TEZ-4604: tez-mapreduce does not delete files under staging directory by okumin · Pull Request #395 · apache/tez · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

TEZ-4604: tez-mapreduce does not delete files under staging directory #395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ 8000 public TezConfiguration(boolean loadDefaults) {
"staging.scratch-data.auto-delete";
public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;

/**
* Boolean value. If true then Tez will try to delete the entire TEZ_AM_STAGING_DIR. Otherwise, Tez will delete
* only a subdirectory created by Tez and a client needs to clean up the parent directories. This is typically used
* by tez-mapreduce.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type = "boolean")
public static final String TEZ_AM_STAGING_BASE_DIR_CLEANUP = TEZ_AM_PREFIX + "staging.base.dir.cleanup";
public static final boolean TEZ_AM_STAGING_BASE_DIR_CLEANUP_DEFAULT = false;

/**
* String value. Specifies the name of the shuffle auxiliary service.
*/
Expand Down
17 changes: 10 additions & 7 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public class DAGAppMaster extends AbstractService {
private boolean recoveryEnabled;
private Path recoveryDataDir;
private Path currentRecoveryDataDir;
private Path tezBaseStagingDir;
private Path tezSystemStagingDir;
private FileSystem recoveryFS;

Expand Down Expand Up @@ -490,6 +491,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
} else {
dispatcher.enableExitOnDispatchException();
}
this.tezBaseStagingDir = TezCommonUtils.getTezBaseStagingPath(conf);
String strAppId = this.appAttemptID.getApplicationId().toString();
this.tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);

Expand Down Expand Up @@ -2201,19 +2203,20 @@ public void serviceStop() throws Exception {
if (deleteTezScratchData && this.taskSchedulerManager != null
&& this.taskSchedulerManager.hasUnregistered()) {
// Delete tez scratch data dir
if (this.tezSystemStagingDir != null) {
boolean cleanupBaseStagingDir = this.amConf.getBoolean(TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP,
TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP_DEFAULT);
Path directory = cleanupBaseStagingDir ? this.tezBaseStagingDir : this.tezSystemStagingDir;
if (directory != null) {
try {
this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
FileSystem fs = directory.getFileSystem(amConf);
boolean deletedStagingDir = fs.delete(directory, true);
if (!deletedStagingDir) {
LOG.warn("Failed to delete tez scratch data dir, path="
+ tezSystemStagingDir);
LOG.warn("Failed to delete tez scratch data dir, path={}", directory);
} else {
LOG.info("Completed deletion of tez scratch data dir, path="
+ tezSystemStagingDir);
LOG.info("Completed deletion of tez scratch data dir, path={}", directory);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
try {
dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
jobSubmitDir);
dagAMConf.setBoolean(TezConfiguration.TEZ_AM_STAGING_BASE_DIR_CLEANUP, true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARNRunner is a glue code between a MapReduce Job and Tez, implementing ClientProtocol. So, the client code of YARNRunner is Apache Hadoop.
ClientProtocol doesn't have an API to declare that a specific job has been completed. If we resolve this issue on the client side, we have to add new APIs to Apache Hadoop. That's why I added a new param and handled the issue on Apache Tez side.
I'm not confident that this approach is the best. I'd appreciate it if someone could give me a better idea.


// Set Tez parameters based on MR parameters.
String queueName = jobConf.get(JobContext.QUEUE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.File;
import java.io.IOException;

import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,6 +91,7 @@ public static void setup() throws IOException {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
//conf.setLong(TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS, 500);
mrrTezCluster.init(conf);
mrrTezCluster.start();
}
Expand All @@ -106,6 +110,32 @@ public static void tearDown() {
}
}

private void assertStagingDir() throws IOException, InterruptedException {
// Wait for the clean-up process to be invoked
while (true) {
int numAllocatedCores = mrrTezCluster.getResourceManager().getResourceScheduler().getRootQueueMetrics()
.getAllocatedVirtualCores();
LOG.info("Number of cores in use: {}", numAllocatedCores);
if (numAllocatedCores == 0) {
break;
}
Thread.sleep(100L);
}

String userName = UserGroupInformation.getCurrentUser().getUserName();
Path userStagingDir = new Path(String.format("%s/%s/.staging", mrrTezCluster.getStagingPath(), userName));

Assert.assertTrue(remoteFs.exists(userStagingDir));

RemoteIterator<LocatedFileStatus> directoryTree = remoteFs.listFiles(userStagingDir, true);
int numFiles = 0;
while (directoryTree.hasNext()) {
numFiles += 1;
LOG.info("Path in the staging dir: {}", directoryTree.next().getPath());
}
Assert.assertEquals(0, numFiles);
}

@Test (timeout = 60000)
public void testMRRSleepJob() throws IOException, InterruptedException,
ClassNotFoundException {
Expand Down Expand Up @@ -140,6 +170,7 @@ public void testMRRSleepJob() throws IOException, InterruptedException,
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
assertStagingDir();

// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
Expand Down Expand Up @@ -190,7 +221,7 @@ public void testRandomWriter() throws IOException, InterruptedException,
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);

assertStagingDir();
}


Expand Down Expand Up @@ -223,6 +254,7 @@ public void testFailingJob() throws IOException, InterruptedException,
boolean succeeded = job.waitForCompletion(true);
Assert.assertFalse(succeeded);
Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
assertStagingDir();

// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
Expand Down Expand Up @@ -257,11 +289,44 @@ public void testFailingAttempt() throws IOException, InterruptedException,
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
assertStagingDir();

// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
}

@Test (timeout = 60000)
public void testFailingSubmission() throws IOException, InterruptedException,
ClassNotFoundException {

LOG.info("\n\n\nStarting testFailingSubmission().");

if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}

Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());

MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);

Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);

job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.getConfiguration().set(JobContext.QUEUE_NAME, "non-existent-queue");

try {
job.submit();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("to unknown queue: non-existent-queue"));
}
assertStagingDir();
}

@Test (timeout = 60000)
public void testMRRSleepJobWithCompression() throws IOException,
InterruptedException, ClassNotFoundException {
Expand Down Expand Up @@ -298,6 +363,7 @@ public void testMRRSleepJobWithCompression() throws IOException,
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
assertStagingDir();

// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
Expand Down Expand Up @@ -354,6 +420,7 @@ public Void run() throws Exception {
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
assertStagingDir();
return null;
}
});
Expand Down
Loading
0