From 512109340b3d0b54be29730a094b74ff2dcaaa53 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 11 Jun 2025 16:03:13 +0100 Subject: [PATCH] integrates with AAL's readVectored() --- .../fs/s3a/impl/streams/AnalyticsStream.java | 55 +++++++++++++++++++ ...3AContractAnalyticsStreamVectoredRead.java | 35 +++++++++++- .../s3a/ITestS3AContractVectoredRead.java | 2 +- 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..a91558f075d60 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -21,9 +21,19 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; @@ -38,6 +48,9 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; + + /** * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports * parquet specific optimisations such as parquet-aware prefetching. For more details, see @@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { + LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); + throwIfClosed(); + + List objectRanges = new ArrayList<>(); + + for (FileRange range : ranges) { + CompletableFuture result = new CompletableFuture<>(); + ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength()); + objectRanges.add(objectRange); + range.setData(result); + } + + // AAL does not do any range coalescing, so input and combined ranges are the same. + this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size()); + inputStream.readVectored(objectRanges, allocate, release); + } @Override public boolean seekToNewSource(long l) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java index 8cf182680c350..bb673a02121c2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -19,11 +19,21 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; + +import org.junit.Test; + +import java.util.List; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; /** * S3A contract tests for vectored reads with the Analytics stream. @@ -57,7 +67,6 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @@ -65,4 +74,28 @@ protected Configuration createConfiguration() { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Override + public void testNegativeOffsetRange() throws Exception { + verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class); + } + + @Test + public void testReadVectoredWithAALStatsCollection() throws Exception { + + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = openVectorFile()){ + in.readVectored(fileRanges, getAllocate()); + + IOStatistics st = in.getIOStatistics(); + + // Statistics such as GET requests will be added after IoStats support. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index bf489fc44a5ff..7bde42e45bfad 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -97,7 +97,7 @@ protected AbstractFSContract createContract(Configuration conf) { public void setup() throws Exception { super.setup(); skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator does not support vectored reads"); + "AAL with readVectored() is tested in ITestS3AContractAnalyticsStreamVectoredRead"); } /**