-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Improvements to Out-of-Core Hash Join #4970
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…in external hash join
I think I got the final thread sanitizer issues out of the way, but the R + arrow test seems to be failing. Is this the test that occasionally fails, or is this something I caused? 10 tests fail with an std::exception |
Hm, these do not look like "the usual" spurious R test failures. At least I haven't seen them before. I will rerun the test. |
Thanks for re-running the tests. This time only 4 failures, seems like the same problem though. Will investigate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the fixes! Looks great. The meta pipeline refactor looks very good. Some remaining comments from my side.
@@ -33,6 +33,10 @@ class PhysicalOrder : public PhysicalOperator { | |||
void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate, | |||
LocalSourceState &lstate) const override; | |||
|
|||
bool IsOrderPreserving() const override { | |||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? Won't this lead to incorrect parallel result set materialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct. It sounds confusing, but the PhysicalOrder
does not preserve the insertion order - the data is completely reordered.
Thanks for the feedback! I've implemented it. Together with Hannes, I've figured out what the R + arrow issue was. R does not like it if threads other than the main thread call back into R, which was what happened for arrow scans when I refactored the pipeline construction. I've tried now to make sure that table function global source state initialization always happens in the main thread. Fingers crossed for CI 🤞 |
Looks like my changes fixed the last failing R + arrow tests. I think the failures are unrelated. Let's wait for the full CI run though, which is taking forever because everyone is so productive! |
Continued where I left off. We now partition the probe side as well (instead of scanning the whole probe side multiple times), which means we can support out-of-core right/outer/mark/anti joins.
Probe-side partitioning happens during the streaming probe (
PhysicalHashJoin::Execute
): tuples that cannot be probed against the partitions of the hash table during this phase, because they belong to different partitions, are sunk into aPartitionedColumnData
. However, if we can finish probing in just two rounds, the probe-side does not need to be partitioned at all, and we sink into aColumnDataCollection
instead.PartitionedColumnData
is a new generic partitioning interface. I've implementedRadixPartitionedColumnData
. An issue with multi-threaded partitioning in general is that each thread allocates data for all partitions, which results in high memory usage. We prevent this by sharing a single allocator per partition across all threads. ThePartitionedColumnData
class can be extended to do, e.g., Hive partitioning in a streaming manner.I've also implemented the
ColumnDataConsumer
class, which can read and consume aColumnDataCollection
. This is useful for the out-of-core hash join, as we wish to read the probe sideColumnDataCollection
just once. Previously, the read data was written back to disk before being thrown away, which was wasteful.I've run the same benchmark as in my previous PR on my laptop, which is joining two tables with 100M integers each, but they only have 1k matches. Here are the numbers:
As we can see, the performance is mostly the same as my previous PR, until the hash table is many times larger than the amount of available memory. This is where this PR improves performance a lot. In the previous PR, when we had to do many partitioned probe phases, we created a lot of I/O pressure by reading and writing the entire probe side every time. In this PR, the I/O pressure is much less, as we are only reading and writing the probe side data once.
I will continue tweaking the performance in future PRs. Happy to receive feedback!
Edit: this PR uncovered a bug with how execution pipelines were scheduled. The problem was rather complicated, so I will try to explain it here.
We have parallelism in regular pipelines, e.g.,
SCAN -> FILTER -> PROJECTION -> AGGREGATION
. The data is pushed from the source (SCAN
) through the streamingFILTER
andPROJECTION
operators, into the sink (AGGREGATION
).Pipelines can also have dependencies on other pipelines, for example
SCAN -> JOIN
(build side of the join), andSCAN -> JOIN -> AGGREGATION
(probe side of the join). The second pipeline depends on the first pipeline to be done.Besides these, we also have two "special" cases of pipelines, namely "union" pipelines, for
UNION
queries, and for "child" pipelines, for streaming operators that become a source operator after streaming is done (e.g., scanning the hash table of a join for unmatched tuples for aFULL OUTER
join). These cases are special because they share their sink with other pipelines.We have run into problems with these special cases before because setting their dependencies up correctly is different from other pipelines. With the way the code was written before, it was very difficult to get things right. I've refactored pipeline construction with a new class called a
MetaPipeline
. This class holds multiplePipeline
s that share the same sink. This makes it much easier to set up dependencies correctly.As a result (bonus!) of this refactor, we can now execute union pipelines in parallel, e.g.:
Before we would scan t1, then t2, then t3. If t1, t2, and t3 are small enough to not keep all threads busy, we would not utilize all threads for the entire query. Now, we can scan all three tables concurrently.
Quick test with sorting a union:
This PR: 0.014s
Master: 0.023s