8000 Improvements to Out-of-Core Hash Join by lnkuiper · Pull Request #4970 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improvements to Out-of-Core Hash Join #4970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
0f47295
init streaming partitioner
lnkuiper Sep 29, 2022
94dcde8
initial implementation of PartitionedColumnData
lnkuiper Sep 29, 2022
fff7c52
lock for shared column data allocator
lnkuiper Sep 30, 2022
0e68b4f
probe-side partitioning seems to (kind of) work!
lnkuiper Sep 30, 2022
b041d83
probe-side partitioning unit tests passing!
lnkuiper Oct 3, 2022
7354665
chunk partitioning performance improvements
lnkuiper Oct 3, 2022
014c121
offset pointer to sel vector instead of passing offset
lnkuiper Oct 3, 2022
8f8dbbe
sort segments, ensure block size is BLOCK_SIZE
lnkuiper Oct 4, 2022
7e9048b
Merge branch 'master' into oochj
lnkuiper Oct 4, 2022
fbe4685
less locking and add ColumnDataAppendState to PartitionedColumnDataAp…
lnkuiper Oct 4, 2022
cdfd9fb
BLOCK_SIZE instead of BLOCK_ALLOC_SIZE, less tasks for ExternalBuild …
lnkuiper Oct 4, 2022
de6b7d5
add a bunch of TODO's so I don't forget
lnkuiper Oct 5, 2022
2154c1b
Merge branch 'master' into oochj
lnkuiper Oct 5, 2022
97c895d
rework PartitionedColumnData API a bit
lnkuiper Oct 5, 2022
e2cfe1c
some code cleanup
lnkuiper Oct 5, 2022
5a59819
Merge branch 'master' into oochj
lnkuiper Oct 5, 2022
0d4dbbb
progress on scanning/consuming a ColumnDataCollection
lnkuiper Oct 6, 2022
6709a48
start work on ColumnDataConsumer
lnkuiper Oct 7, 2022
146282f
properly implement ColumnDataConsumer for external hash join
lnkuiper Oct 7, 2022
19021d6
Merge branch 'master' into oochj
lnkuiper Oct 7, 2022
bd167f6
always init probespill and re-enable anti/outer external joins
lnkuiper Oct 7, 2022
3bcde35
delete block handles/pins more cleanly
lnkuiper Oct 10, 2022
746cde5
Merge branch 'master' into oochj
lnkuiper Oct 10, 2022
39ef799
enable out-of-core for anti/semi/mark joins
lnkuiper Oct 10, 2022
7eba41d
add ProbeSpillLocalState to prevent some segfaultage
lnkuiper Oct 10, 2022
8cfb189
simplify external GetData init
lnkuiper Oct 10, 2022
688de0d
Merge branch 'master' into oochj
lnkuiper Oct 10, 2022
8d01507
some code cleanup and trying to make CI happy
lnkuiper Oct 11, 2022
2d711c1
Merge branch 'master' into oochj
lnkuiper Oct 11, 2022
96ef927
fix bug with CountValid (took me FOREVER to find out this was the pro…
lnkuiper Oct 11, 2022
d22121b
always init probespill
lnkuiper Oct 11, 2022
652ae1a
trying to please windows CI
lnkuiper Oct 11, 2022
fb6bfc0
fix race condition and nullptr
lnkuiper Oct 11, 2022
b2d5e43
Merge branch 'master' into oochj
lnkuiper Oct 11, 2022
917706f
revert some change to prep for PR
lnkuiper Oct 11, 2022
b24f48d
make BufferSize never exceed STANDARD_VECTOR_SIZE
lnkuiper Oct 12, 2022
8dcf968
Merge branch 'master' into oochj
lnkuiper Oct 12, 2022
e6b682b
Merge branch 'master' into oochj
lnkuiper Oct 17, 2022
cf652a7
Merge branch 'master' into oochj
lnkuiper Oct 17, 2022
9f8aba0
trying to debug union pipeline scheduling
lnkuiper Oct 18, 2022
009c61f
add verification code for executor
lnkuiper Oct 20, 2022
49ff92a
Merge branch 'master' into oochj
lnkuiper Oct 20, 2022
d078a2a
clean up verification code a bit (still struggling to find pipeline s…
lnkuiper Oct 20, 2022
3a848d2
initial implementation of MetaPipeline - not working yet
lnkuiper Oct 25, 2022
99dfdd8
add PipelineInitializeEvent and refactor so union pipelines can run i…
lnkuiper Oct 26, 2022
2676db1
refactor pipeline construction: got queries working again (somewhat)
lnkuiper Oct 27, 2022
9bd9e6e
combinations of union/child pipelines seem to work - still some bugs …
lnkuiper Oct 27, 2022
faf8f82
fix recursive CTE after pipeline build refactor
lnkuiper Oct 27, 2022
e5ad8c5
fix includes for faster compilation time, and fix union pipeline depe…
lnkuiper Oct 28, 2022
c075f78
Merge branch 'master' into oochj
lnkuiper Oct 28, 2022
d35afb0
pipeline build rework: just one test failing (recursive cte's grr)
lnkuiper Oct 28, 2022
e0db7ff
Merge branch 'master' into oochj
lnkuiper Oct 28, 2022
d363d99
only schedule recursive pipelines within PhysicalRecursiveCTE
lnkuiper Oct 31, 2022
8f789a9
fix last issues with iejoin
lnkuiper Oct 31, 2022
85bd79f
Merge branch 'master' into oochj
lnkuiper Nov 1, 2022
fd943f9
fix bug with union all order preservation
lnkuiper Nov 1, 2022
e366a00
refactor IEJoin pipelines to have a single MetaPipeline
lnkuiper Nov 1, 2022
2433ade
properly set recursive CTE in MetaPipeline and add some missing includes
lnkuiper Nov 1, 2022
34a0ef5
trying to please the CI
lnkuiper Nov 1, 2022
88cade3
trying to please CI and dodge threadsan data races
lnkuiper Nov 1, 2022
148f699
still trying to dodge threadsan data races
lnkuiper Nov 2, 2022
691c0cf
Add lock to RandomEngine to make RandomInitLocalState thread-safe
lnkuiper Nov 2, 2022
fe9887e
Merge branch 'master' into oochj
lnkuiper Nov 2, 2022
e87e469
add missing include
lnkuiper Nov 2, 2022
0829e35
reset source in main thread for R
lnkuiper Nov 4, 2022
e5b63e1
Merge branch 'master' into oochj
lnkuiper Nov 4, 2022
1ddc86d
implement PR feedback and fix skipped tests
lnkuiper Nov 4, 2022
e86917e
Merge branch 'master' into oochj
lnkuiper Nov 7, 2022
cf5744d
init global source state before calling continue in loop ... oops
lnkuiper Nov 7, 2022
79bc672
merge with master
lnkuiper Nov 7, 2022
9090797
add missing return statement
lnkuiper Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 86 additions & 21 deletions src/common/radix_partitioning.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "duckdb/common/radix_partitioning.hpp"

#include "duckdb/common/row_operations/row_operations.hpp"
#include "duckdb/common/types/partitioned_column_data.hpp"
#include "duckdb/common/types/row_data_collection.hpp"
#include "duckdb/common/types/row_layout.hpp"
#include "duckdb/common/types/vector.hpp"
#include "duckdb/common/vector_operations/binary_executor.hpp"
#include "duckdb/common/vector_operations/unary_executor.hpp"

namespace duckdb {

10000 Expand Down Expand Up @@ -95,6 +97,32 @@ RETURN_TYPE DoubleRadixBitsSwitch1(idx_t radix_bits_1, idx_t radix_bits_2, ARGS
}
}

template <idx_t radix_bits>
struct RadixLessThan {
static inline bool Operation(hash_t hash, hash_t cutoff) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
return CONSTANTS::ApplyMask(hash) < cutoff;
}
};

struct SelectFunctor {
template <idx_t radix_bits>
static idx_t Operation(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
Vector cutoff_vector(Value::HASH(cutoff));
return BinaryExecutor::Select<hash_t, hash_t, RadixLessThan<radix_bits>>(hashes, cutoff_vector, sel, count,
true_sel, false_sel);
}
};

idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t radix_bits, idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, hashes, sel, count, cutoff, true_sel, false_sel);
}

//===--------------------------------------------------------------------===//
// Row Data Partitioning
//===--------------------------------------------------------------------===//
template <idx_t radix_bits>
static void InitPartitions(BufferManager &buffer_manager, vector<unique_ptr<RowDataCollection>> &partition_collections,
RowDataBlock *partition_blocks[], vector<BufferHandle> &partition_handles,
Expand Down Expand Up @@ -157,7 +185,10 @@ struct PartitionFunctor {

auto &data_blocks = block_collection.blocks;
auto &heap_blocks = string_heap.blocks;
for (idx_t block_idx = 0; block_idx < data_blocks.size(); block_idx++) {
for (idx_t block_idx_plus_one = data_blocks.size(); block_idx_plus_one > 0; block_idx_plus_one--) {
// We loop through blocks in reverse to save some of that PRECIOUS I/O
idx_t block_idx = block_idx_plus_one - 1;

RowDataBlock *data_block;
BufferHandle data_handle;
data_ptr_t data_ptr;
Expand Down Expand Up @@ -275,7 +306,7 @@ struct PartitionFunctor {
static inline void FlushTempBuf(data_ptr_t &data_ptr, const idx_t &row_width, uint32_t &block_count,
const data_ptr_t &tmp_buf, uint32_t &pos, const idx_t count) {
pos -= count;
memcpy(data_ptr, tmp_buf + pos * row_width, count * row_width);
FastMemcpy(data_ptr, tmp_buf + pos * row_width, count * row_width);
data_ptr += count * row_width;
block_count += count;
}
Expand Down Expand Up @@ -369,35 +400,69 @@ struct PartitionFunctor {
}
};

void RadixPartitioning::Partition(BufferManager &buffer_manager, const RowLayout &layout, const idx_t hash_offset,
RowDataCollection &block_collection, RowDataCollection &string_heap,
vector<unique_ptr<RowDataCollection>> &partition_block_collections,
vector<unique_ptr<RowDataCollection>> &partition_string_heaps, idx_t radix_bits) {
void RadixPartitioning::PartitionRowData(BufferManager &buffer_manager, const RowLayout &layout,
const idx_t hash_offset, RowDataCollection &block_collection,
RowDataCollection &string_heap,
vector<unique_ptr<RowDataCollection>> &partition_block_collections,
vector<unique_ptr<RowDataCollection>> &partition_string_heaps,
idx_t radix_bits) {
return RadixBitsSwitch<PartitionFunctor, void>(radix_bits, buffer_manager, layout, hash_offset, block_collection,
string_heap, partition_block_collections, partition_string_heaps);
}

template <idx_t radix_bits>
struct RadixLessThan {
static inline bool Operation(hash_t hash, hash_t cutoff) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
return CONSTANTS::ApplyMask(hash) < cutoff;
//===--------------------------------------------------------------------===//
// Column Data Partitioning
//===--------------------------------------------------------------------===//
RadixPartitionedColumnData::RadixPartitionedColumnData(ClientContext &context_p, vector<LogicalType> types_p,
idx_t radix_bits_p, idx_t hash_col_idx_p)
: PartitionedColumnData(PartitionedColumnDataType::RADIX, context_p, move(types_p)), radix_bits(radix_bits_p),
hash_col_idx(hash_col_idx_p) {
D_ASSERT(hash_col_idx < types.size());
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
allocators->allocators.reserve(num_partitions);
for (idx_t i = 0; i < num_partitions; i++) {
CreateAllocator();
}
};
D_ASSERT(allocators->allocators.size() == num_partitions);
}

struct SelectFunctor {
RadixPartitionedColumnData::RadixPartitionedColumnData(const RadixPartitionedColumnData &other)
: PartitionedColumnData(other), radix_bits(other.radix_bits), hash_col_idx(other.hash_col_idx) {
for (idx_t i = 0; i < RadixPartitioning::NumberOfPartitions(radix_bits); i++) {
partitions.emplace_back(CreatePartitionCollection(i));
}
}

RadixPartitionedColumnData::~RadixPartitionedColumnData() {
}

void RadixPartitionedColumnData::InitializeAppendStateInternal(PartitionedColumnDataAppendState &state) const {
const auto num_partitions = RadixPartitioning::NumberOfPartitions(radix_bits);
state.partition_buffers.reserve(num_partitions);
state.partition_append_states.reserve(num_partitions);
for (idx_t i = 0; i < num_partitions; i++) {
// TODO only initialize the append if partition idx > ...
state.partition_append_states.emplace_back(make_unique<ColumnDataAppendState>());
partitions[i]->InitializeAppend(*state.partition_append_states[i]);
state.partition_buffers.emplace_back(CreatePartitionBuffer());
}
}

struct ComputePartitionIndicesFunctor {
template <idx_t radix_bits>
static idx_t Operation(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
Vector cutoff_vector(Value::HASH(cutoff));
return BinaryExecutor::Select<hash_t, hash_t, RadixLessThan<radix_bits>>(hashes, cutoff_vector, sel, count,
true_sel, false_sel);
static void Operation(Vector &hashes, Vector &partition_indices, idx_t count) {
UnaryExecutor::Execute<hash_t, hash_t>(hashes, partition_indices, count, [&](hash_t hash) {
using CONSTANTS = RadixPartitioningConstants<radix_bits>;
return CONSTANTS::ApplyMask(hash);
});
}
};

idx_t RadixPartitioning::Select(Vector &hashes, const SelectionVector *sel, idx_t count, idx_t radix_bits, idx_t cutoff,
SelectionVector *true_sel, SelectionVector *false_sel) {
return RadixBitsSwitch<SelectFunctor, idx_t>(radix_bits, hashes, sel, count, cutoff, true_sel, false_sel);
void RadixPartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAppendState &state, DataChunk &input) {
D_ASSERT(partitions.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
D_ASSERT(state.partition_buffers.size() == RadixPartitioning::NumberOfPartitions(radix_bits));
RadixBitsSwitch<ComputePartitionIndicesFunctor, void>(radix_bits, input.data[hash_col_idx], state.partition_indices,
input.size());
}

} // namespace duckdb
14 changes: 14 additions & 0 deletions src/common/symbols.cpp
B93C
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "duckdb/optimizer/join_order_optimizer.hpp"
#include "duckdb/optimizer/rule.hpp"
#include "duckdb/parallel/pipeline.hpp"
#include "duckdb/parallel/meta_pipeline.hpp"
#include "duckdb/parser/constraint.hpp"
#include "duckdb/parser/constraints/list.hpp"
#include "duckdb/parser/expression/list.hpp"
Expand All @@ -35,6 +36,8 @@
#include "duckdb/storage/data_table.hpp"
#include "duckdb/storage/write_ahead_log.hpp"
#include "duckdb/transaction/transaction.hpp"
#include "duckdb/common/types/column_data_collection.hpp"
#include "duckdb/common/types/column_data_allocator.hpp"

using namespace duckdb;

Expand Down Expand Up @@ -79,11 +82,16 @@ template class std::unique_ptr<CrossProductRef>;
template class std::unique_ptr<JoinRef>;
template class std::unique_ptr<SubqueryRef>;
template class std::unique_ptr<TableFunctionRef>;
template class std::shared_ptr<Event>;
template class std::unique_ptr<Pipeline>;
template class std::shared_ptr<Pipeline>;
template class std::weak_ptr<Pipeline>;
template class std::shared_ptr<MetaPipeline>;
template class std::unique_ptr<RowGroup>;
template class std::shared_ptr<RowGroupCollection>;
template class std::unique_ptr<ColumnDataCollection>;
template class std::shared_ptr<ColumnDataAllocator>;
template class std::unique_ptr<PartitionedColumnData>;
template class std::shared_ptr<PreparedStatementData>;
template class std::unique_ptr<VacuumInfo>;

Expand Down Expand Up @@ -127,6 +135,7 @@ template class std::unique_ptr<Vector[]>;
template class std::unique_ptr<DataChunk>;
template class std::unique_ptr<JoinHashTable>;
template class std::unique_ptr<JoinHashTable::ScanStructure>;
template class std::unique_ptr<JoinHashTable::ProbeSpill>;
template class std::unique_ptr<data_ptr_t[]>;
template class std::unique_ptr<Rule>;
template class std::unique_ptr<LogicalFilter>;
Expand Down Expand Up @@ -167,11 +176,16 @@ template class std::vector<PhysicalType>;
template class std::vector<Value>;
template class std::vector<int>;
INSTANTIATE_VECTOR(std::vector<std::unique_ptr<Rule>>)
INSTANTIATE_VECTOR(std::vector<std::shared_ptr<Event>>)
INSTANTIATE_VECTOR(std::vector<std::unique_ptr<Pipeline>>)
INSTANTIATE_VECTOR(std::vector<std::shared_ptr<Pipeline>>)
INSTANTIATE_VECTOR(std::vector<std::weak_ptr<Pipeline>>)
INSTANTIATE_VECTOR(std::vector<std::shared_ptr<MetaPipeline>>)
template class std::vector<std::vector<Expression *>>;
template class std::vector<LogicalType>;
INSTANTIATE_VECTOR(std::vector<std::unique_ptr<JoinHashTable>>)
INSTANTIATE_VECTOR(std::vector<std::unique_ptr<ColumnDataCollection>>)
INSTANTIATE_VECTOR(std::vector<std::shared_ptr<ColumnDataAllocator>>)

#if !defined(__clang__)
template struct std::atomic<uint64_t>;
Expand Down
2 changes: 2 additions & 0 deletions src/common/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ add_library_unity(
column_data_allocator.cpp
column_data_collection.cpp
column_data_collection_segment.cpp
column_data_consumer.cpp
data_chunk.cpp
date.cpp
decimal.cpp
Expand All @@ -21,6 +22,7 @@ add_library_unity(
uuid.cpp
hyperloglog.cpp
interval.cpp
partitioned_column_data.cpp
row_data_collection.cpp
row_data_collection_scanner.cpp
row_layout.cpp
Expand Down
25 changes: 22 additions & 3 deletions src/common/types/column_data_allocator.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "duckdb/common/types/column_data_allocator.hpp"
#include "duckdb/storage/buffer_manager.hpp"

#include "duckdb/common/types/column_data_collection_segment.hpp"
#include "duckdb/storage/buffer_manager.hpp"

namespace duckdb {

Expand Down Expand Up @@ -29,7 +30,15 @@ ColumnDataAllocator::ColumnDataAllocator(ClientContext &context, ColumnDataAlloc

BufferHandle ColumnDataAllocator::Pin(uint32_t block_id) {
D_ASSERT(type == ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR);
return alloc.buffer_manager->Pin(blocks[block_id].handle);
shared_ptr<BlockHandle> *block_handle;
if (shared) {
// need to grab handle from the vector within a lock else threadsan will complain
lock_guard<mutex> guard(lock);
block_handle = &blocks[block_id].handle;
} else {
block_handle = &blocks[block_id].handle;
}
return alloc.buffer_manager->Pin(*block_handle);
}

void ColumnDataAllocator::AllocateBlock() {
Expand Down Expand Up @@ -106,9 +115,15 @@ void ColumnDataAllocator::AllocateData(idx_t size, uint32_t &block_id, uint32_t
ChunkManagementState *chunk_state) {
switch (type) {
case ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR:
AllocateBuffer(size, block_id, offset, chunk_state);
if (shared) {
lock_guard<mutex> guard(lock);
AllocateBuffer(size, block_id, offset, chunk_state);
} else {
AllocateBuffer(size, block_id, offset, chunk_state);
}
break;
case ColumnDataAllocatorType::IN_MEMORY_ALLOCATOR:
D_ASSERT(!shared);
AllocateMemory(size, block_id, offset, chunk_state);
break;
default:
Expand Down Expand Up @@ -138,6 +153,10 @@ data_ptr_t ColumnDataAllocator::GetDataPointer(ChunkManagementState &state, uint
return state.handles[block_id].Ptr() + offset;
}

void ColumnDataAllocator::DeleteBlock(uint32_t block_id) {
blocks[block_id].handle->SetCanDestroy(true);
}

Allocator &ColumnDataAllocator::GetAllocator() {
return type == ColumnDataAllocatorType::IN_MEMORY_ALLOCATOR ? *alloc.allocator
: alloc.buffer_manager->GetBufferAllocator();
Expand Down
4 changes: 4 additions & 0 deletions src/common/types/column_data_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,4 +812,8 @@ bool ColumnDataCollection::ResultEquals(const ColumnDataCollection &left, const
return true;
}

const vector<unique_ptr<ColumnDataCollectionSegment>> &ColumnDataCollection::GetSegments() const {
return segments;
}

} // namespace duckdb
1 change: 0 additions & 1 deletion src/common/types/column_data_collection_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ void ColumnDataCollectionSegment::FetchChunk(idx_t chunk_idx, DataChunk &result)
void ColumnDataCollectionSegment::FetchChunk(idx_t chunk_idx, DataChunk &result, const vector<column_t> &column_ids) {
D_ASSERT(chunk_idx < chunk_data.size());
ChunkManagementState state;
InitializeChunkState(chunk_idx, state);
state.properties = ColumnDataScanProperties::DISALLOW_ZERO_COPY;
ReadChunk(chunk_idx, state, result, column_ids);
}
Expand Down
Loading
0