8000 Improvements to Out-of-Core Hash Join by lnkuiper · Pull Request #4970 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improvements to Out-of-Core Hash Join #4970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Nov 8, 2022
Merged

Conversation

lnkuiper
Copy link
Contributor
@lnkuiper lnkuiper commented Oct 12, 2022

Continued where I left off. We now partition the probe side as well (instead of scanning the whole probe side multiple times), which means we can support out-of-core right/outer/mark/anti joins.

Probe-side partitioning happens during the streaming probe (PhysicalHashJoin::Execute): tuples that cannot be probed against the partitions of the hash table during this phase, because they belong to different partitions, are sunk into a PartitionedColumnData. However, if we can finish probing in just two rounds, the probe-side does not need to be partitioned at all, and we sink into a ColumnDataCollection instead.

PartitionedColumnData is a new generic partitioning interface. I've implemented RadixPartitionedColumnData. An issue with multi-threaded partitioning in general is that each thread allocates data for all partitions, which results in high memory usage. We prevent this by sharing a single allocator per partition across all threads. The PartitionedColumnData class can be extended to do, e.g., Hive partitioning in a streaming manner.

I've also implemented the ColumnDataConsumer class, which can read and consume a ColumnDataCollection. This is useful for the out-of-core hash join, as we wish to read the probe side ColumnDataCollection just once. Previously, the read data was written back to disk before being thrown away, which was wasteful.

I've run the same benchmark as in my previous PR on my laptop, which is joining two tables with 100M integers each, but they only have 1k matches. Here are the numbers:

Memory limit (GB) Old time (s) New time (s)
10 1.97 1.96
9 1.97 1.97
8 2.23 2.22
7 2.23 2.44
6 2.27 2.39
5 2.27 2.32
4 2.81 2.45
3 5.60 3.20
2 7.69 3.28
1 17.73 4.35

As we can see, the performance is mostly the same as my previous PR, until the hash table is many times larger than the amount of available memory. This is where this PR improves performance a lot. In the previous PR, when we had to do many partitioned probe phases, we created a lot of I/O pressure by reading and writing the entire probe side every time. In this PR, the I/O pressure is much less, as we are only reading and writing the probe side data once.

I will continue tweaking the performance in future PRs. Happy to receive feedback!

Edit: this PR uncovered a bug with how execution pipelines were scheduled. The problem was rather complicated, so I will try to explain it here.

We have parallelism in regular pipelines, e.g., SCAN -> FILTER -> PROJECTION -> AGGREGATION. The data is pushed from the source (SCAN) through the streaming FILTER and PROJECTION operators, into the sink (AGGREGATION).

Pipelines can also have dependencies on other pipelines, for example SCAN -> JOIN (build side of the join), and SCAN -> JOIN -> AGGREGATION (probe side of the join). The second pipeline depends on the first pipeline to be done.

Besides these, we also have two "special" cases of pipelines, namely "union" pipelines, for UNION queries, and for "child" pipelines, for streaming operators that become a source operator after streaming is done (e.g., scanning the hash table of a join for unmatched tuples for a FULL OUTER join). These cases are special because they share their sink with other pipelines.

We have run into problems with these special cases before because setting their dependencies up correctly is different from other pipelines. With the way the code was written before, it was very difficult to get things right. I've refactored pipeline construction with a new class called a MetaPipeline. This class holds multiple Pipelines that share the same sink. This makes it much easier to set up dependencies correctly.

As a result (bonus!) of this refactor, we can now execute union pipelines in parallel, e.g.:

SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t3

Before we would scan t1, then t2, then t3. If t1, t2, and t3 are small enough to not keep all threads busy, we would not utilize all threads for the entire query. Now, we can scan all three tables concurrently.

Quick test with sorting a union:

create table test as select cast(random() * 100000000 as int) i from range(100000);
with union_cte as (
select i from test
union all
select i from test
union all
select i from test
union all
select i from test)
select count(i) from (select i from union_cte order by i offset 1);

This PR: 0.014s
Master: 0.023s

@lnkuiper
Copy link
Contributor Author
lnkuiper commented Nov 2, 2022

I think I got the final thread sanitizer issues out of the way, but the R + arrow test seems to be failing. Is this the test that occasionally fails, or is this something I caused? 10 tests fail with an std::exception

@Mytherin
Copy link
Collaborator
Mytherin commented Nov 2, 2022

Hm, these do not look like "the usual" spurious R test failures. At least I haven't seen them before. I will rerun the test.

@lnkuiper
Copy link
Contributor Author
lnkuiper commented Nov 3, 2022

Thanks for re-running the tests. This time only 4 failures, seems like the same problem though. Will investigate.

Copy link
Collaborator
@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the fixes! Looks great. The meta pipeline refactor looks very good. Some remaining comments from my side.

@@ -33,6 +33,10 @@ class PhysicalOrder : public PhysicalOperator {
void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,
LocalSourceState &lstate) const override;

bool IsOrderPreserving() const override {
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Won't this lead to incorrect parallel result set materialization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. It sounds confusing, but the PhysicalOrder does not preserve the insertion order - the data is completely reordered.

@lnkuiper
Copy link
Contributor Author
lnkuiper commented Nov 4, 2022

Thanks for the feedback! I've implemented it. Together with Hannes, I've figured out what the R + arrow issue was. R does not like it if threads other than the main thread call back into R, which was what happened for arrow scans when I refactored the pipeline construction.

I've tried now to make sure that table function global source state initialization always happens in the main thread. Fingers crossed for CI 🤞

@lnkuiper
Copy link
Contributor Author
lnkuiper commented Nov 7, 2022

Looks like my changes fixed the last failing R + arrow tests. I think the failures are unrelated. Let's wait for the full CI run though, which is taking forever because everyone is so productive!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0