-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Tuple Data Collection #6998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tuple Data Collection #6998
Conversation
…om TupleDataLayout for compatibility (for now)
Aha, that makes sense. I think that is indeed fine as long as the partitions are all thread-local (which I think they should be). |
Thinking about this more, maybe we can have both an |
The problem is that freeing data will get messy if allocators are mixed - although freeing data individually is obviously not required for an arena allocator. An alternative could also be that we deprecate the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice! I've only looked at the header files because you asked me to comment on the API. It looks much better than the old stuff - most of my comments are more about clarifying the comments/documentation.
@@ -170,6 +170,9 @@ struct TemplatedValidityMask { | |||
} | |||
return ValidityBuffer::MAX_ENTRY >> (BITS_PER_VALUE - n); | |||
} | |||
static inline idx_t SizeInBytes(idx_t n) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the number of values, not the number of bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is the size in bytes:
(n + BITS_PER_VALUE - 1) / BITS_PER_VALUE
Let's say BITS_PER_VALUE
is 8, and we supply n = 3
, then we will get:
(3 + 8 - 1) / 8
= 12 / 8
= 1
Gives you the size of a validity mask that can hold n
values. 1 byte is enough for 3
static constexpr const idx_t NUM_RADIX_BITS = radix_bits; | ||
static constexpr const idx_t NUM_PARTITIONS = (idx_t)1 << NUM_RADIX_BITS; | ||
static constexpr const idx_t TMP_BUF_SIZE = 8; | ||
static inline constexpr idx_t NumberOfPartitions(const idx_t &radix_bits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a reference? Is this a style rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gotten used to using a const reference everywhere, because clang-tidy will complain about this when the arguments to a function are not trivially copyable, like vector
. Of course, idx_t
is trivially copyable so this is not necessary, but it does not hurt, either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is unlikely to be a problem here (and the compiler will likely optimize it out) but passing in a reference is equivalent to passing in a pointer to an element, which is less efficient when the element itself is very small (since we have to write 8 bytes for the pointer, then have to follow the pointer to the location of the element to read the value).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know this! Thanks for clarifying
return (hash & MASK) >> (sizeof(hash_t) * 8 - NUM_RADIX_BITS); | ||
static inline idx_t RadixBits(const idx_t &n_partitions) { | ||
D_ASSERT(IsPowerOfTwo(n_partitions)); | ||
for (idx_t r = 0; r < sizeof(idx_t) * 8; r++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bits per bye?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the inverse of NumberOfPartitions
, I've clarified it with a comment now
void InitializeAppendStateInternal(PartitionedTupleDataAppendState &state, | ||
TupleDataPinProperties properties) const override; | ||
void ComputePartitionIndices(PartitionedTupleDataAppendState &state, DataChunk &input) override; | ||
void ComputePartitionIndices(Vector &row_locations, idx_t count, Vector &partition_indices) const override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha I just wrote the same thing for AsOf Join!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the code can be shared? I have some code duplication PartitionedColumnData
and PartitionedTupleData
, which I want to get rid of in the future.
//! Builds out the buffer space for the specified chunk state | ||
void Build(TupleDataPinState &pin_state, TupleDataChunkState &chunk_state, const idx_t append_offset, | ||
const idx_t append_count); | ||
//! Scatters the given DataChunk to the rows in the specified append state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an implied append state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, this comment is also outdated. An append state is nothing more than:
struct TupleDataAppendState {
TupleDataPinState pin_state;
TupleDataChunkState chunk_state;
};
However, in many cases it was useful to pull these apart, and supply the pin/chunk states separately. For example, when appending to a PartitionedTupleDataCollection
, we need one pin state per partition, but only one chunk state overall.
void FinalizePinState(TupleDataPinState &pin_state); | ||
|
||
//! Appends the other TupleDataCollection to this, destroying the other data collection | ||
void Combine(TupleDataCollection &other); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a move operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this method now:
void Combine(unique_ptr<TupleDataCollection> other);
Which will destroy other
.
Hi @hawkfish, big thanks for the review. I've cleaned up some comments, added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Looks great. Very nice results, and great new API! Some comments below:
auto &key = keys[i]; | ||
const auto idx = sel.get_index(i); | ||
if (validity.RowIsValid(idx)) { | ||
key.values[col_idx] = GetHiveKeyValue(data[idx], type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is actually significantly faster - since this is still creating Value
objects for every row. Have you profiled this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is faster. We need Value
for this to be generic, but there's possibly other ways of achieving this.
I didn't want to touch the implementation too much, so I did it like this.
The problem with the initial implementation was that it called GetValue
with a col/row idx on the DataChunk
to get the values, which is a pretty expensive function to call multiple times. My rework vectorizes getting the data from the vector, which is significantly faster.
As I said in my PR:
I got a bit side-tracked in this PR and vectorized some code in hive_partitioning.cpp and was able to improve the runtime of to_parquet_partition_by_many.benchmark from ~2.95s to ~2.25s on my laptop.
static constexpr const idx_t NUM_RADIX_BITS = radix_bits; | ||
static constexpr const idx_t NUM_PARTITIONS = (idx_t)1 << NUM_RADIX_BITS; | ||
static constexpr const idx_t TMP_BUF_SIZE = 8; | ||
static inline constexpr idx_t NumberOfPartitions(const idx_t &radix_bits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is unlikely to be a problem here (and the compiler will likely optimize it out) but passing in a reference is equivalent to passing in a pointer to an element, which is less efficient when the element itself is very small (since we have to write 8 bytes for the pointer, then have to follow the pointer to the location of the element to read the value).
Vector &result, const SelectionVector &target_sel) const; | ||
|
||
//! Converts this TupleDataCollection to a string representation | ||
string ToString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
method perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ToString
cannot be const
, because Scan
cannot be const
. Scan
may cause pointers to be recomputed, therefore modifying base_heap_ptr
in TupleDataChunkPart
.
//! The data of the HT | ||
vector<BufferHandle> payload_hds; | ||
unique_ptr<TupleDataCollection> data_collection; | ||
TupleDataAppendState td_append_state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved to the AggregateHTAppendState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will try to do this. I can move the TupleDataChunkState
of the TupleDataAppendState
into the AggregateHTAppendState
, but the TupleDataPinState
of the TupleDataAppendState
needs to be HT-specific.
SelectionVector partition_sel; | ||
|
||
static constexpr idx_t MAP_THRESHOLD = 32; | ||
perfect_map_t<list_entry_t> partition_entries; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a perfect_map
not better implemented as a vector
, or are the numbers here potentially very large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage so far almost always has small numbers but can in some cases have large numbers, but the size of the map (number of entries) will always be small. Therefore I chose not to use a vector. I want to get back to this at some point and unify computing the partition indices for both the PartitionedColumnData
and PartitionedTupleData
, and then I will try to find a more elegant solution.
Thanks for the feedback, Mark :) Just trying to please CI now |
I'm getting data races in the buffer eviction queue, and at this point I'm not sure what to do about it anymore. I'm also unable to reproduce this on a linux box |
Thanks! LGTM |
This PR implements
TupleDataCollection
, an (even) more unified way of materializing data in row format.In various places in DuckDB's execution engine, we materialize data in row format using
RowDataCollection
andRowLayout
. While this is already a somewhat unified way of materializing data in row format, it has a few issues when we want to spill the row data to disk. The pointers within the row format have to be manually (un)swizzled when (un)pinned. This is cumbersome and requires some serious bookkeeping within physical operators that use row format data. This (un)swizzling may also be redundant if the blocks that hold the data are never written to disk, which may happen in cases where the data size is only slightly over the memory limit.This PR implements the
TupleDataCollection
class that, similar to theColumnDataCollection
class (#4315),DataChunk
s can simply be appended or scanned, without the need to explicitly hold buffer pins in various places in the code. This is all managed by theTupleDataAllocator
class. The two problems described above (regarding pointer swizzling) are addressed in a similar way to how pointers in aColumnDataCollection
are never explicitly swizzled (#5543). In short, we simply compare pointers to see whether the block was spilled to disk. If the pointers are different, we recompute them, if the pointers are the same, they are still valid and nothing needs to be done.TupleDataCollection
has been integrated inJoinHashTable
andAggregateHashTable
, and should be integrated into the sorting code before the oldRowDataCollection
can be phased out. At that point I would like to renameTupleDataCollection
toRowDataCollection
for more consistency with naming in the code base.This PR also brings improvements to the out-of-core hash join, and should improve the out-of-core capabilities of the hash aggregate. To improve the out-of-core hash join, I've implemented
PartitionedTupleData
, which, similar toPartitionedColumnData
(#4970), we can appendDataChunk
s to, directly to the partitions. Rather than materializing all data in a singleTupleDataCollection
, we now immediately materialize to 16 partitions. If all data fits in memory, we combine the partitions into one, and perform an in-memory hash join. If not all data fits in memory, we combine as many partitions as fit in memory into one, and do a partitioned hash join like before. If a single one of the 16 partitions does not fit in memory, we repartition to up to 256 partitions, and perform the out-of-core hash join like before.I've ran the same benchmark as in the previous PRs (linked above):
I got a bit side-tracked in this PR and vectorized some code in
hive_partitioning.cpp
and was able to improve the runtime ofto_parquet_partition_by_many.benchmark
from ~2.95s to ~2.25s on my laptop.P.S.: I am definitely not done tweaking the performance of the hash join/aggregate, especially after speaking with Thomas Neumann here at ICDE. I look forward to optimizing these operators more in the future.
Also, this definitely has merge conflicts with #6995. Happy to fix them and merge this after that has been merged.