8000 HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed by saxenapranav · Pull Request #5176 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content 10000

HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed #5176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2168,9 +2168,8 @@ The switch to turn S3A auditing on or off.

<property>
<name>fs.azure.enable.readahead</name>
<value>false</value>
<description>Disable readahead/prefetching in AbfsInputStream.
See HADOOP-18521</description>
<value>true</value>
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = false;
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean isReadAheadEnabled = false;
private boolean isReadAheadEnabled = true;

private boolean alwaysReadBufferSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
}

/**
Expand Down Expand Up @@ -642,4 +641,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
freeList.clear();
completedReadList.add(buf);
}

@VisibleForTesting
int getNumBuffers() {
return NUM_BUFFERS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
Expand Down Expand Up @@ -69,7 +68,6 @@ protected Configuration createConfiguration() {
protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
return new AbfsFileSystemContract(conf, isSecure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;

@Override
public void teardown() throws Exception {
super.teardown();
ReadBufferManager.getBufferManager().testResetReadBufferManager();
}

private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
Expand All @@ -106,7 +112,6 @@ private AbfsClient getMockAbfsClient() {
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient,
Expand All @@ -132,7 +137,6 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
boolean alwaysReadBufferSize,
int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
abfsClient,
Expand Down Expand Up @@ -495,6 +499,69 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}

/**
* This test expects InProgressList is not purged by the inputStream close.
*/
@Test
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
final Long serverCommunicationMockLatency = 3_000L;
final Long readBufferTransferToInProgressProbableTime = 1_000L;
final Integer readBufferQueuedCount = 3;

Mockito.doAnswer(invocationOnMock -> {
//sleeping thread to mock the network latency from client to backend.
Thread.sleep(serverCommunicationMockLatency);
return successOp;
})
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class), any(TracingContext.class));

final ReadBufferManager readBufferManager
= ReadBufferManager.getBufferManager();

final int readBufferTotal = readBufferManager.getNumBuffers();
final int expectedFreeListBufferCount = readBufferTotal
- readBufferQueuedCount;

try (AbfsInputStream inputStream = getAbfsInputStream(client,
"testSuccessfulReadAhead.txt")) {
// As this is try-with-resources block, the close() method of the created
// abfsInputStream object shall be called on the end of the block.
queueReadAheads(inputStream);

//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
Thread.sleep(readBufferTransferToInProgressProbableTime);

Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs(String.format("FreeList should have %d elements",
expectedFreeListBufferCount))
.hasSize(expectedFreeListBufferCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);
}

Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs(String.format("FreeList should have %d elements",
expectedFreeListBufferCount))
.hasSize(expectedFreeListBufferCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);
}

/**
* This test expects ReadAheadManager to throw exception if the read ahead
* thread had failed within the last thresholdAgeMilliseconds.
Expand Down
0