8000 HADOOP-18805. S3A prefetch LRU test to work with small files by virajjasani · Pull Request #5843 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-18805. S3A prefetch LRU test to work with small files #5843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
Expand All @@ -63,9 +64,7 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"1"},
{"2"},
{"3"},
{"4"}
{"2"}
});
}

Expand All @@ -78,39 +77,44 @@ public ITestS3APrefetchingLruEviction(final String maxBlocks) {
LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);

private static final int S_1K = 1024;
private static final int S_500 = 512;
private static final int SMALL_FILE_SIZE = S_1K * 56;

// Path for file which should have length > block size so S3ACachingInputStream is used
private Path largeFile;
private FileSystem largeFileFS;
private Path smallFile;
private FileSystem smallFileFS;
private int blockSize;

private static final int TIMEOUT_MILLIS = 5000;
private static final int TIMEOUT_MILLIS = 3000;
private static final int INTERVAL_MILLIS = 500;

@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, S_1K * 10);
return conf;
}

@Override
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
cleanupWithLogger(LOG, smallFileFS);
smallFileFS = null;
}

private void openFS() throws Exception {
Configuration conf = getConfiguration();
String largeFileUri = S3ATestUtils.getCSVTestFile(conf);

largeFile = new Path(largeFileUri);
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26);
smallFile = path("iTestS3APrefetchingLruEviction");
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
smallFileFS = new S3AFileSystem();
smallFileFS.initialize(new URI(smallFile.toString()), getConfiguration());
}

@Test
Expand All @@ -125,7 +129,7 @@ public void testSeeksWithLruEviction() throws Throwable {
.build());
CountDownLatch countDownLatch = new CountDownLatch(7);

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = smallFileFS.open(smallFile)) {
ioStats = in.getIOStatistics();
// tests to add multiple blocks in the prefetch cache
// and let LRU eviction take place as more cache entries
Expand All @@ -135,43 +139,43 @@ public void testSeeksWithLruEviction() throws Throwable {
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
0,
blockSize - S_1K * 10));
blockSize - S_500 * 10));

// Seek to block 1 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize,
2 * S_1K));
2 * S_500));

// Seek to block 2 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 2L,
2 * S_1K));
2 * S_500));

// Seek to block 3 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 3L,
2 * S_1K));
2 * S_500));

// Seek to block 4 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 4L,
2 * S_1K));
2 * S_500));

// Seek to block 5 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 5L,
2 * S_1K));
2 * S_500));

// backward seek, can't use block 0 as it is evicted
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
S_1K * 5,
2 * S_1K));
S_500 * 5,
2 * S_500));

countDownLatch.await();

Expand Down Expand Up @@ -206,7 +210,6 @@ public void testSeeksWithLruEviction() throws Throwable {
private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
try {
in.readFully(position, buffer, 0, len);
countDownLatch.countDown();
Expand All @@ -229,7 +232,6 @@ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDat
private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
try {
in.seek(position);
in 41B3 .readFully(buffer, 0, len);
Expand Down
0