-
Notifications
You must be signed in to change notification settings - Fork 9k
HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache #5754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…leFilePerBlockCache
|
🎊 +1 overall
This message was automatically generated. |
@mukund-thakur @mehakmeet could you please review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did an initial review at first glance. Looks good. Need to properly understand the locking and if there are any scenarios we need to think of and would review the tests next.
private Entry head; | ||
|
||
/** | ||
* Tail of the lined list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "linked"
/** | ||
* Prefetch max blocks count config. | ||
*/ | ||
public static final String FS_PREFETCH_MAX_BLOCKS_COUNT = "fs.prefetch.max.blocks.count"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a Constants class where we can move this to?
this.maxBlocksCount = | ||
conf.getInt(FS_PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_FS_PREFETCH_MAX_BLOCKS_COUNT); | ||
Preconditions.checkArgument(this.maxBlocksCount > 0, | ||
"prefetch blocks total capacity should be more than 0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include the property name in the error message by which we can set this to a valid value
.../hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
Show resolved
Hide resolved
if (tail != null) { | ||
while (tail.getNext() != null) { | ||
tail = tail.getNext(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a bit about this part, not able to get why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, let's say:
head -> 1, tail -> 2
new block: 3
so, we need to make: 3 -> 1 -> 2
i.e. new head -> 3, tail -> 2
new block: 4
updated list: 4 -> 3 -> 1 -> 2
now let's say input stream accesses block 2, hence block 2 needs to be put at the head.
new list should be: 2 -> 4 -> 3 -> 1
we change head to 2 and we also update next pointer of block 1
however, if we don't update tail (L322-L326), we will not be able to move tail to block 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, but when we are adding the node to the head, doesn't it make more sense to check if the current node is tail, get the previous of this, and set that to tail? This would work, was just interested if we can avoid traversing the list 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, nice optimization for sure, let me double check this again, i ran through multiple test iterations and somehow found that this works for sure but let me check if the optimization works (i think it should but i am just wondering if i am missing some cases).
Thank you btw @mehakmeet !!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i applied this patch temporarily to debug further but somehow head and tail are getting screwed up:
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
index ef685b54d30..1aad82ff9c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
@@ -306,6 +306,7 @@ private void addToHeadOfLinkedList(Entry entry) {
"Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
entry.blockNumber, head.blockNumber, tail.blockNumber);
if (entry != head) {
+ boolean isEntryTail = entry == tail;
Entry prev = entry.getPrevious();
Entry nxt = entry.getNext();
if (prev != null) {
@@ -318,10 +319,8 @@ private void addToHeadOfLinkedList(Entry entry) {
entry.setNext(head);
head.setPrevious(entry);
head = entry;
- }
- if (tail != null) {
- while (tail.getNext() != null) {
- tail = tail.getNext();
+ if (isEntryTail) {
+ tail = prev;
}
}
} finally {
however, somehow after eviction, the head and tail are getting screwed up, still trying to understand what is going wrong and why this patch would not work.
but i hope you were suggestion change like this one, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it works with slight modification, let me push the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, happy with the new change I think. Would be good to explicitly test certain tail changing scenarios in the IT like you mentioned above if we are not already doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, i was actually thinking about it but there is just not clean way of doing so, hence what i have been able to do so far was by "logging" head and tail nodes (as you also mentioned earlier) with all other nodes, so that i could track the exact nodes sequence. that's the best way i could find so far, but extracting that info in IT is really difficult (if we were to do it in clean way).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I was thinking, would it be possible via a simple UT as well, where we pass in the entries as we desire and access them in our preferences to test functionality, might be easier if we directly test the LRU logic than via the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice idea, it might be more beneficial for UT to test this.
i am also planning to refactor Entry class on it's own new class rather than as an inner class of SingleFilePerBlockCache as part of next follow-up sub-task. once we do that, then it might be even more easier to write some UT to directly access head, tail pointers.
sorry, i was thinking this as sub-task so maybe adding UT can also be done with sub-task. does that sound good?
elementToPurge.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT, | ||
PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); | ||
if (!lockAcquired) { | ||
LOG.error("Cache file {} deletion would not be attempted as write lock could not" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, there can be a scenario where the current cache exceeds its normal capacity? Is 5 seconds enough time? or are we okay with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are already using 5s at other place also (PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in future as/if we encounter some problem with this, does that sound good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we are okay with things not blowing up if eviction is not successful, are we okay with it? Can this hurt in the long run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be okay, in fact we have same logic for input stream close as well, if eviction or removal of disk block is unsuccessful, we are leaving them with a fat warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if eviction misses it, stream close would be able to clean it up.
if stream close misses it, then it stays on disk and we might eventually also come up with some "file last accessed" based check and maybe some crons removing them eventually. not a bad idea IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds good
.../hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Some changes in the test. What would really be good is to Test this in a multi-threaded test too, nothing major but creating an executor Service with 5 threads then trying to read and evict 3 blocks and asserting the correct IOStats and read values should be enough. This would test the locking as well.
|
||
@Override | ||
public Configuration createConfiguration() { | ||
Configuration conf = super.createConfiguration(); | ||
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); | ||
conf.setBoolean(PREFETCH_ENABLED_KEY, true); | ||
conf.setInt(FS_PREFETCH_MAX_BLOCKS_COUNT, PREFETCH_MAX_NUM_BLOCKS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove base and bucket config for this property in L86, just so that test is consistent in diff env.
// backward seek, can't use block 0 as it is evicted | ||
in.seek(S_1K * 5); | ||
in.read(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert the number of evictions being done or blocks being removed from the list. At certain points, test what the capacity of the list is to keep it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the num of assertions, i was thinking of adding metric as next sub-task so that this patch doesn't become too complicated to review. is that fine with you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that should be fine as well, I saw this method, so thought we were already recording that metric blockRemovedFromFileCache()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, shall we introduce new metric that would help differentiate blockRemovedFromFileCache()
vs blockEvictedFromFileCache()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, why not, it'll be good for debugging purposes if there's any difference between them we would know that there's some issue with the proper deletion of the files from cache. Although an overkill but never hurts 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds great, let me create follow-up sub-task for introducing the metric, and update the test with the sub-task.
this will likely keep the commit history clean and easy to manage :)
thanks you once again!
ioStats = in.getIOStatistics(); | ||
|
||
byte[] buffer = new byte[blockSize]; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here explaining what the test is doing on a high level, so that it's easier to understand the flow of how LRU is happening.
// Seek to block 3 and don't read completely | ||
in.seek(blockSize * 3L); | ||
in.read(buffer, 0, 2 * S_1K); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a seek to the intersection point of two blocks(eg: 4*blockSize - 10KB) and read some bytes(>10KBs) to read both blocks and assert if the head of the list is the correct block.
thanks for the review @mehakmeet so we have two follow-up sub-tasks to be created later:
does this sound good? i will re-run the whole test-suite against |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key change: pass size down directly; allows for an fs.s3a option which can then be set on a per-basis
tests: I want to see this working with a block size of 1; head==tail and adding a new block replaces the entire list.
this is probably the size I'd recommend once we add vector IO support. one block to store the orc/parquet footer with all other reads done in the vector API, which (probably) don't need caching
/** | ||
* Constants used by prefetch implementation F438 s. | ||
*/ | ||
public final class Constants { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a different name please; too confusing
} | ||
|
||
/** | ||
* Prefetch max blocks count config. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add {@value} here and below so IDE popups show the value
/** | ||
* Prefetch max blocks count config. | ||
*/ | ||
public static final String FS_PREFETCH_MAX_BLOCKS_COUNT = "fs.prefetch.max.blocks.count"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use fs.s3a here for per bucket settings.
how about just pass in the count as a parameter, rather than a full Configuration object?
.../hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
Show resolved
Hide resolved
try { | ||
in.read(buffer, 0, blockSize - S_1K * 10); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be an UncheckedIOException, but why not just return true
from each closure so they become Callable rather than Runnable, so can throw exceptions
Can we not use some already inbuilt cache rather than us implementing it from scratch ( this part is interesting for sure :)) |
Interesting, from the javadoc:
for not-so-heavy loaded cache, perhaps our own implementation might be better? given that even reads would have immediate reflection on the doubly linked list data structure in our case. |
this will need new test class because if we change block size config to 1 for ITestS3APrefetchingInputStream, rest of the tests fail since we do lot of IOStats assertions with num of blocks prefetched etc |
@mukund-thakur i tried using guava LoadingCache, it's not consistently able to evict cache entries, it's doing asynchronously (but it takes very long time to evict entries) with weak ref and hence leading to inconsistent num of entries. for instance, even when i set max size as 1, i can see 8 entries in the map for more than 15s. hence, maintaining consistency with concurrency seems really problematic with this implementation.
the patch i tried:
Even with concurrencyLevel(1), we don't get strong consistency. The evictions are not taking place even after 40s+ wait :( Edit: https://github.com/google/guava/blob/master/guava/src/com/google/common/cache/LocalCache.java i tried LoadingCache because it is for public usage, whereas LocalCache is for guava's internal impl. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
...-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
Outdated
Show resolved
Hide resolved
@mehakmeet @mukund-thakur @steveloughran |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readFile() needs to always close the file channel, even of a read fails. (not caused by thos patch, but extant).
same for writeFile()
sounds good for both read and write, will file a jira unless already filed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy with production code (though worried about that default); test changes though
* Default value for max blocks count config. | ||
* Value = {@value DEFAULT_PREFETCH_MAX_BLOCKS_COUNT} | ||
*/ | ||
public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is per stream? it's big. we may want to see what happens in apps with many threads -risk of out of disk is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps 4 is enough?
|
||
@Override | ||
public Configuration createConfiguration() { | ||
Configuration conf = super.createConfiguration(); | ||
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be restored, and add PREFETCH_MAX_BLOCKS_COUNT, as another to cut.
in all Itests, assume that any option set in `createConfiguration()' MAY have a per-bucket override set by someone, so you MUST explicitly remove it.
...-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
Show resolved
Hide resolved
byte[] buffer = new byte[blockSize]; | ||
// Don't read block 0 completely | ||
try { | ||
in.read(buffer, 0, blockSize - S_1K * 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use readFully(); always.
byte[] buffer = new byte[blockSize]; | ||
// Seek to block 1 and don't read completely | ||
try { | ||
in.seek(blockSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use PositionedReadable to include it in the test coverage
readFully(blockSize * 4L, buffer, 0, 2 * S_1K);
// and let LRU eviction take place as more cache entries | ||
// are added with multiple block reads. | ||
|
||
executorService.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these submitted closures are almost identical. why not define one for readFully(), one for PositionedRedable.readFully() and submit them; maybe as some function
Callable<Boolean> readFully(CountDownLatch countDownLatch, boolean positionedReadable, long offset, long size) {
// return one of these with seek(offset) and read to a buffer of [size]
}
then submit these
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes are good, though test failures are due to DEFAULT_PREFETCH_MAX_BLOCKS_COUNT being < the #of blocks fetched; the tests. they will need to assert that the number of cached blocks matches min(default, requested blocks)
java.lang.IllegalStateException: waitForCaching: expected: 8, actual: 4, read errors: 0, caching errors: 0
at org.apache.hadoop.fs.s3a.prefetch.TestS3ACachingBlockManager.waitForCaching(TestS3ACachingBlockManager.java:360)
at org.apache.hadoop.fs.s3a.prefetch.TestS3ACachingBlockManager.testCachingOfPrefetched(TestS3ACachingBlockManager.java:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:750)
re-running the whole test suite with -prefetch and -scale combinations |
🎊 +1 overall
This message was automatically generated. |
test results look good |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
merged, though now i'm using it that new test is way too slow. in my rebased unbuffered pr I have moved it to -Dscale, but really we can just set the block size down to something minimal and then work with a small file |
…leFilePerBlockCache (#5754) Contributed by Viraj Jasani
created addendum PR for dealing with small file #5843 |
…leFilePerBlockCache (apache#5754) Contributed by Viraj Jasani
Jira: HADOOP-18291