8000 Add parallel memset when building hash join table by hehezhou · Pull Request #16172 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add parallel memset when building hash join table #16172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/execution/join_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ void JoinHashTable::InsertHashes(Vector &hashes_v, const idx_t count, TupleDataC
}
}

void JoinHashTable::InitializePointerTable() {
void JoinHashTable::AllocatePointerTable() {
capacity = PointerTableCapacity(Count());
D_ASSERT(IsPowerOfTwo(capacity));

Expand All @@ -699,12 +699,14 @@ void JoinHashTable::InitializePointerTable() {
}
D_ASSERT(hash_map.GetSize() == capacity * sizeof(ht_entry_t));

// initialize HT with all-zero entries
std::fill_n(entries, capacity, ht_entry_t());

bitmask = capacity - 1;
}

void JoinHashTable::InitializePointerTable(idx_t entry_idx_from, idx_t entry_idx_to) {
// initialize HT with all-zero entries
std::fill_n(entries + entry_idx_from, entry_idx_to - entry_idx_from, ht_entry_t());
}

void JoinHashTable::Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool parallel) {
// Pointer table should be allocated
D_ASSERT(hash_map.get());
Expand Down
69 changes: 64 additions & 5 deletions src/execution/operator/join/physical_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,60 @@ void PhysicalHashJoin::PrepareFinalize(ClientContext &context, GlobalSinkState &
gstate.temporary_memory_state->SetRemainingSize(gstate.total_size);
}

class HashJoinTableInitTask : public ExecutorTask {
public:
HashJoinTableInitTask(shared_ptr<Event> event_p, ClientContext &context, HashJoinGlobalSinkState &sink_p,
idx_t entry_idx_from_p, idx_t entry_idx_to_p, const PhysicalOperator &op_p)
: ExecutorTask(context, std::move(event_p), op_p), sink(sink_p), entry_idx_from(entry_idx_from_p),
entry_idx_to(entry_idx_to_p) {
}

TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
sink.hash_table->InitializePointerT 10000 able(entry_idx_from, entry_idx_to);
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

private:
HashJoinGlobalSinkState &sink;
idx_t entry_idx_from;
idx_t entry_idx_to;
};

class HashJoinTableInitEvent : public BasePipelineEvent {
public:
HashJoinTableInitEvent(Pipeline &pipeline_p, HashJoinGlobalSinkState &sink)
: BasePipelineEvent(pipeline_p), sink(sink) {
}

HashJoinGlobalSinkState &sink;

public:
void Schedule() override {
auto &context = pipeline->GetClientContext();
vector<shared_ptr<Task>> finalize_tasks;
auto &ht = *sink.hash_table;
const auto entry_count = ht.capacity;
auto num_threads = NumericCast<idx_t>(sink.num_threads);
if (num_threads == 1 || (entry_count < PARALLEL_CONSTRUCT_THRESHOLD && !context.config.verify_parallelism)) {
// Single-threaded memset
finalize_tasks.push_back(
make_uniq<HashJoinTableInitTask>(shared_from_this(), context, sink, 0U, entry_count, sink.op));
} else {
// Parallel memset
for (idx_t entry_idx = 0; entry_idx < entry_count; entry_idx += ENTRIES_PER_TASK) {
auto entry_idx_to = MinValue<idx_t>(entry_idx + ENTRIES_PER_TASK, entry_count);
finalize_tasks.push_back(make_uniq<HashJoinTableInitTask>(shared_from_this(), context, sink, entry_idx,
entry_idx_to, sink.op));
}
}
SetTasks(std::move(finalize_tasks));
}

static constexpr const idx_t PARALLEL_CONSTRUCT_THRESHOLD = 1048576;
static constexpr const idx_t ENTRIES_PER_TASK = 131072;
};

class HashJoinFinalizeTask : public ExecutorTask {
public:
HashJoinFinalizeTask(shared_ptr<Event> event_p, ClientContext &context, HashJoinGlobalSinkState &sink_p,
Expand Down Expand Up @@ -463,7 +517,7 @@ class HashJoinFinalizeEvent : public BasePipelineEvent {
sink.max_partition_size + JoinHashTable::PointerTableSize(sink.max_partition_count);
const auto skew = static_cast<double>(max_partition_ht_size) / static_cast<double>(sink.total_size);

if (num_threads == 1 || (ht.Count() < PARALLEL_CONSTRUCT_THRESHOLD && skew > SKEW_SINGLE_THREADED_THRESHOLD &&
if (num_threads == 1 || ((ht.Count() < PARALLEL_CONSTRUCT_THRESHOLD || skew > SKEW_SINGLE_THREADED_THRESHOLD) &&
!context.config.verify_parallelism)) {
// Single-threaded finalize
finalize_tasks.push_back(
Expand Down Expand Up @@ -495,9 +549,13 @@ void HashJoinGlobalSinkState::ScheduleFinalize(Pipeline &pipeline, Event &event)
hash_table->finalized = true;
return;
}
hash_table->InitializePointerTable();
auto new_event = make_shared_ptr<HashJoinFinalizeEvent>(pipeline, *this);
event.InsertEvent(std::move(new_event));
hash_table->AllocatePointerTable();

auto new_init_event = make_shared_ptr<HashJoinTableInitEvent>(pipeline, *this);
event.InsertEvent(new_init_event);

auto new_finalize_event = make_shared_ptr<HashJoinFinalizeEvent>(pipeline, *this);
new_init_event->InsertEvent(std::move(new_finalize_event));
}

void HashJoinGlobalSinkState::InitializeProbeSpill() {
Expand Down Expand Up @@ -1101,7 +1159,8 @@ void HashJoinGlobalSourceState::PrepareBuild(HashJoinGlobalSinkState &sink) {
}
}

ht.InitializePointerTable();
ht.AllocatePointerTable();
ht.InitializePointerTable(0, ht.capacity);

global_stage = HashJoinSourceStage::BUILD;
}
Expand Down
4 changes: 3 additions & 1 deletion src/include/duckdb/execution/join_hashtable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ class JoinHashTable {
void Merge(JoinHashTable &other);
//! Combines the partitions in sink_collection into data_collection, as if it were not partitioned
void Unpartition();
//! Allocate the pointer table for the probe
void AllocatePointerTable();
//! Initialize the pointer table for the probe
void InitializePointerTable();
void InitializePointerTable(idx_t entry_idx_from, idx_t entry_idx_to);
//! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing.
//! Finalize must be called before any call to Probe, and after Finalize is called Build should no longer be
//! ever called.
Expand Down
Loading
0