8000 Tuple Data Collection by lnkuiper · Pull Request #6998 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Tuple Data Collection #6998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 124 commits into from
Apr 17, 2023
Merged

Tuple Data Collection #6998

merged 124 commits into from
Apr 17, 2023

Conversation

lnkuiper
Copy link
Contributor
@lnkuiper lnkuiper commented Apr 7, 2023

This PR implements TupleDataCollection, an (even) more unified way of materializing data in row format.

In various places in DuckDB's execution engine, we materialize data in row format using RowDataCollection and RowLayout. While this is already a somewhat unified way of materializing data in row format, it has a few issues when we want to spill the row data to disk. The pointers within the row format have to be manually (un)swizzled when (un)pinned. This is cumbersome and requires some serious bookkeeping within physical operators that use row format data. This (un)swizzling may also be redundant if the blocks that hold the data are never written to disk, which may happen in cases where the data size is only slightly over the memory limit.

This PR implements the TupleDataCollection class that, similar to the ColumnDataCollection class (#4315), DataChunks can simply be appended or scanned, without the need to explicitly hold buffer pins in various places in the code. This is all managed by the TupleDataAllocator class. The two problems described above (regarding pointer swizzling) are addressed in a similar way to how pointers in a ColumnDataCollection are never explicitly swizzled (#5543). In short, we simply compare pointers to see whether the block was spilled to disk. If the pointers are different, we recompute them, if the pointers are the same, they are still valid and nothing needs to be done.

TupleDataCollection has been integrated in JoinHashTable and AggregateHashTable, and should be integrated into the sorting code before the old RowDataCollection can be phased out. At that point I would like to rename TupleDataCollection to RowDataCollection for more consistency with naming in the code base.

This PR also brings improvements to the out-of-core hash join, and should improve the out-of-core capabilities of the hash aggregate. To improve the out-of-core hash join, I've implemented PartitionedTupleData, which, similar to PartitionedColumnData (#4970), we can append DataChunks to, directly to the partitions. Rather than materializing all data in a single TupleDataCollection, we now immediately materialize to 16 partitions. If all data fits in memory, we combine the partitions into one, and perform an in-memory hash join. If not all data fits in memory, we combine as many partitions as fit in memory into one, and do a partitioned hash join like before. If a single one of the 16 partitions does not fit in memory, we repartition to up to 256 partitions, and perform the out-of-core hash join like before.

I've ran the same benchmark as in the previous PRs (linked above):

Memory limit (GB) Old time (s) New time (s)
10 1.74 1.56
5 2.47 2.09
3 4.54 3.64
1 4.80 3.66

I got a bit side-tracked in this PR and vectorized some code in hive_partitioning.cpp and was able to improve the runtime of to_parquet_partition_by_many.benchmark from ~2.95s to ~2.25s on my laptop.

P.S.: I am definitely not done tweaking the performance of the hash join/aggregate, especially after speaking with Thomas Neumann here at ICDE. I look forward to optimizing these operators more in the future.

Also, this definitely has merge conflicts with #6995. Happy to fix them and merge this after that has been merged.

…om TupleDataLayout for compatibility (for now)
@Mytherin
Copy link
Collaborator
Mytherin commented Apr 8, 2023

@Mytherin The problem is that the arena allocator contains the list aggregate data for the whole unpartitioned HT, but when we partition, the list data is spread across all partitions, but there is only one arena allocator that owns the data. We could move the arena allocator somewhere else, perhaps? It should be somewhere where it stays alive until the aggregate is complete.

Aha, that makes sense. I think that is indeed fine as long as the partitions are all thread-local (which I think they should be).

@lnkuiper
Copy link
Contributor Author
lnkuiper commented Apr 8, 2023

Thinking about this more, maybe we can have both an aggregate_allocator which can be written to (non-shared) and a shared one that is just there to keep the data alive. That is much less error-prone. I can add a lock so we don't run into issues when multiple threads are repartitioning in parallel.

@Mytherin
Copy link
Collaborator
Mytherin commented Apr 8, 2023

Thinking about this more, maybe we can have both an aggregate_allocator which can be written to (non-shared) and a shared one that is just there to keep the data alive. That is much less error-prone. I can add a lock so we don't run into issues when multiple threads are repartitioning in parallel.

The problem is that freeing data will get messy if allocators are mixed - although freeing data individually is obviously not required for an arena allocator. An alternative could also be that we deprecate the destroy callback for the aggregate function entirely, and force everything to be allocated through arena allocators that are passed in from the hash table itself. That might make the most sense - but it would require some work with migrating our existing aggregate functions to all use the arena allocator infrastructure.

Copy link
Contributor
@hawkfish hawkfish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice! I've only looked at the header files because you asked me to comment on the API. It looks much better than the old stuff - most of my comments are more about clarifying the comments/documentation.

@@ -170,6 +170,9 @@ struct TemplatedValidityMask {
}
return ValidityBuffer::MAX_ENTRY >> (BITS_PER_VALUE - n);
}
static inline idx_t SizeInBytes(idx_t n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the number of values, not the number of bytes?

Copy link
Contributor Author
@lnkuiper lnkuiper Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is the size in bytes:

  (n + BITS_PER_VALUE - 1) / BITS_PER_VALUE

Let's say BITS_PER_VALUE is 8, and we supply n = 3, then we will get:

  (3 + 8 - 1) / 8
= 12 / 8
= 1

Gives you the size of a validity mask that can hold n values. 1 byte is enough for 3

static constexpr const idx_t NUM_RADIX_BITS = radix_bits;
static constexpr const idx_t NUM_PARTITIONS = (idx_t)1 << NUM_RADIX_BITS;
static constexpr const idx_t TMP_BUF_SIZE = 8;
static inline constexpr idx_t NumberOfPartitions(const idx_t &radix_bits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a reference? Is this a style rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gotten used to using a const reference everywhere, because clang-tidy will complain about this when the arguments to a function are not trivially copyable, like vector. Of course, idx_t is trivially copyable so this is not necessary, but it does not hurt, either.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unlikely to be a problem here (and the compiler will likely optimize it out) but passing in a reference is equivalent to passing in a pointer to an element, which is less efficient when the element itself is very small (since we have to write 8 bytes for the pointer, then have to follow the pointer to the location of the element to read the value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know this! Thanks for clarifying

return (hash & MASK) >> (sizeof(hash_t) * 8 - NUM_RADIX_BITS);
static inline idx_t RadixBits(const idx_t &n_partitions) {
D_ASSERT(IsPowerOfTwo(n_partitions));
for (idx_t r = 0; r < sizeof(idx_t) * 8; r++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bits per bye?

Copy link
Contributor Author
@lnkuiper lnkuiper Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the inverse of NumberOfPartitions, I've clarified it with a comment now

void InitializeAppendStateInternal(PartitionedTupleDataAppendState &state,
TupleDataPinProperties properties) const override;
void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) override;
void ComputePartitionIndices(Vector &row_locations, idx_t count, Vector &partition_indices) const override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha I just wrote the same thing for AsOf Join!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the code can be shared? I have some code duplication PartitionedColumnData and PartitionedTupleData, which I want to get rid of in the future.

//! Builds out the buffer space for the specified chunk state
void Build(TupleDataPinState &pin_state, TupleDataChunkState &chunk_state, const idx_t append_offset,
const idx_t append_count);
//! Scatters the given DataChunk to the rows in the specified append state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an implied append state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this comment is also outdated. An append state is nothing more than:

struct TupleDataAppendState {
	TupleDataPinState pin_state;
	TupleDataChunkState chunk_state;
};

However, in many cases it was useful to pull these apart, and supply the pin/chunk states separately. For example, when appending to a PartitionedTupleDataCollection, we need one pin state per partition, but only one chunk state overall.

void FinalizePinState(TupleDataPinState &pin_state);

//! Appends the other TupleDataCollection to this, destroying the other data collection
void Combine(TupleDataCollection &other);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a move operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this method now:

void Combine(unique_ptr<TupleDataCollection> other);

Which will destroy other.

@lnkuiper
Copy link
Contributor Author

Hi @hawkfish, big thanks for the review. I've cleaned up some comments, added const here and there, and improved function names. Hopefully, that makes future usage easier. Just looking to clear up the final thread sanitizer issue now.

Copy link
Collaborator
@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Looks great. Very nice results, and great new API! Some comments below:

auto &key = keys[i];
const auto idx = sel.get_index(i);
if (validity.RowIsValid(idx)) {
key.values[col_idx] = GetHiveKeyValue(data[idx], type);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is actually significantly faster - since this is still creating Value objects for every row. Have you profiled this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is faster. We need Value for this to be generic, but there's possibly other ways of achieving this.
I didn't want to touch the implementation too much, so I did it like this.

The problem with the initial implementation was that it called GetValue with a col/row idx on the DataChunk to get the values, which is a pretty expensive function to call multiple times. My rework vectorizes getting the data from the vector, which is significantly faster.

As I said in my PR:

I got a bit side-tracked in this PR and vectorized some code in hive_partitioning.cpp and was able to improve the runtime of to_parquet_partition_by_many.benchmark from ~2.95s to ~2.25s on my laptop.

static constexpr const idx_t NUM_RADIX_BITS = radix_bits;
static constexpr const idx_t NUM_PARTITIONS = (idx_t)1 << NUM_RADIX_BITS;
static constexpr const idx_t TMP_BUF_SIZE = 8;
static inline constexpr idx_t NumberOfPartitions(const idx_t &radix_bits) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unlikely to be a problem here (and the compiler will likely optimize it out) but passing in a reference is equivalent to passing in a pointer to an element, which is less efficient when the element itself is very small (since we have to write 8 bytes for the pointer, then have to follow the pointer to the location of the element to read the value).

Vector &result, const SelectionVector &target_sel) const;

//! Converts this TupleDataCollection to a string representation
string ToString();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const method perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToString cannot be const, because Scan cannot be const. Scan may cause pointers to be recomputed, therefore modifying base_heap_ptr in TupleDataChunkPart.

//! The data of the HT
vector<BufferHandle> payload_hds;
unique_ptr<TupleDataCollection> data_collection;
TupleDataAppendState td_append_state;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to the AggregateHTAppendState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will try to do this. I can move the TupleDataChunkState of the TupleDataAppendState into the AggregateHTAppendState, but the TupleDataPinState of the TupleDataAppendState needs to be HT-specific.

SelectionVector partition_sel;

static constexpr idx_t MAP_THRESHOLD = 32;
perfect_map_t<list_entry_t> partition_entries;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a perfect_map not better implemented as a vector, or are the numbers here potentially very large?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage so far almost always has small numbers but can in some cases have large numbers, but the size of the map (number of entries) will always be small. Therefore I chose not to use a vector. I want to get back to this at some point and unify computing the partition indices for both the PartitionedColumnData and PartitionedTupleData, and then I will try to find a more elegant solution.

@lnkuiper
Copy link
Contributor Author

Thanks for the feedback, Mark :)

Just trying to please CI now

@lnkuiper
Copy link
Contributor Author

I'm getting data races in the buffer eviction queue, and at this point I'm not sure what to do about it anymore. I'm also unable to reproduce this on a linux box

@Mytherin Mytherin merged commit 82211fc into duckdb:master Apr 17, 2023
@Mytherin
Copy link
Collaborator

Thanks! LGTM

@lnkuiper lnkuiper mentioned this pull request Apr 19, 2023
2 tasks
@lnkuiper lnkuiper deleted the oochj branch May 1, 2023 13:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0