8000 Grow string dictionary dynamically in Parquet writer by lnkuiper · Pull Request #17061 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Grow string dictionary dynamically in Parquet writer #17061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions extension/parquet/include/parquet_bss_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ namespace duckdb {
class BssEncoder {
public:
explicit BssEncoder(const idx_t total_value_count_p, const idx_t bit_width_p)
: total_value_count(total_value_count_p), bit_width(bit_width_p), count(0),
buffer(Allocator::DefaultAllocator().Allocate(total_value_count * bit_width + 1)) {
: total_value_count(total_value_count_p), bit_width(bit_width_p), count(0) {
}

public:
void BeginWrite(Allocator &allocator) {
buffer = allocator.Allocate(total_value_count * bit_width + 1);
}

template <class T>
void WriteValue(const T &value) {
D_ASSERT(sizeof(T) == bit_width);
Expand All @@ -41,23 +44,4 @@ class BssEncoder {
AllocatedData buffer;
};

namespace bss_encoder {

template <class T>
void WriteValue(BssEncoder &encoder, const T &value) {
throw InternalException("Can't write type to BYTE_STREAM_SPLIT column");
}

template <>
void WriteValue(BssEncoder &encoder, const float &value) {
encoder.WriteValue(value);
}

template <>
void WriteValue(BssEncoder &encoder, const double &value) {
encoder.WriteValue(value);
}

} // namespace bss_encoder

} // namespace duckdb
80 changes: 38 additions & 42 deletions extension/parquet/include/parquet_dbp_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,27 @@ class DbpEncoder {
}

public:
void BeginWrite(WriteStream &writer, const int64_t &first_value) {
template <class T>
void BeginWrite(WriteStream &writer, const T &first_value) {
throw InternalException("DbpEncoder should only be used with integers");
}

template <class T>
void WriteValue(WriteStream &writer, const T &value) {
throw InternalException("DbpEncoder should only be used with integers");
}

void FinishWrite(WriteStream &writer) {
if (count + block_count != total_value_count) {
throw InternalException("value count mismatch when writing DELTA_BINARY_PACKED");
}
if (block_count != 0) {
WriteBlock(writer);
}
}

private:
void BeginWriteInternal(WriteStream &writer, const int64_t &first_value) {
// <block size in values> <number of miniblocks in a block> <total value count> <first value>

// the block size is a multiple of 128; it is stored as a ULEB128 int
Expand All @@ -50,7 +70,7 @@ class DbpEncoder {
block_count = 0;
}

void WriteValue(WriteStream &writer, const int64_t &value) {
void WriteValueInternal(WriteStream &writer, const int64_t &value) {
// 1. Compute the differences between consecutive elements. For the first element in the block,
// use the last element in the previous block or, in the case of the first block,
// use the first value of the whole sequence, stored in the header.
Expand All @@ -72,16 +92,6 @@ class DbpEncoder {
}
}

void FinishWrite(WriteStream &writer) {
if (count + block_count != total_value_count) {
throw InternalException("value count mismatch when writing DELTA_BINARY_PACKED");
}
if (block_count != 0) {
WriteBlock(writer);
}
}

private:
void WriteBlock(WriteStream &writer) {
D_ASSERT(count + block_count == total_value_count || block_count == BLOCK_SIZE_IN_VALUES);
const auto number_of_miniblocks =
Expand Down Expand Up @@ -176,58 +186,44 @@ class DbpEncoder {
data_t data_packed[NUMBER_OF_VALUES_IN_A_MINIBLOCK * sizeof(int64_t)];
};

namespace dbp_encoder {

template <class T>
void BeginWrite(DbpEncoder &encoder, WriteStream &writer, const T &first_value) {
throw InternalException("Can't write type to DELTA_BINARY_PACKED column");
}

template <>
void BeginWrite(DbpEncoder &encoder, WriteStream &writer, const int64_t &first_value) {
encoder.BeginWrite(writer, first_value);
inline void DbpEncoder::BeginWrite(WriteStream &writer, const int32_t &first_value) {
BeginWriteInternal(writer, first_value);
}

template <>
void BeginWrite(DbpEncoder &encoder, WriteStream &writer, const int32_t &first_value) {
BeginWrite(encoder, writer, UnsafeNumericCast<int64_t>(first_value));
inline void DbpEncoder::BeginWrite(WriteStream &writer, const int64_t &first_value) {
BeginWriteInternal(writer, first_value);
}

template <>
void BeginWrite(DbpEncoder &encoder, WriteStream &writer, const uint64_t &first_value) {
encoder.BeginWrite(writer, UnsafeNumericCast<int64_t>(first_value));
inline void DbpEncoder::BeginWrite(WriteStream &writer, const uint32_t &first_value) {
BeginWriteInternal(writer, first_value);
}

template <>
void BeginWrite(DbpEncoder &encoder, WriteStream &writer, const uint32_t &first_value) {
BeginWrite(encoder, writer, UnsafeNumericCast<int64_t>(first_value));
}

template <class T>
void WriteValue(DbpEncoder &encoder, WriteStream &writer, const T &value) {
throw InternalException("Can't write type to DELTA_BINARY_PACKED column");
inline void DbpEncoder::BeginWrite(WriteStream &writer, const uint64_t &first_value) {
BeginWriteInternal(writer, first_value);
}

template <>
void WriteValue(DbpEncoder &encoder, WriteStream &writer, const int64_t &value) {
encoder.WriteValue(writer, value);
inline void DbpEncoder::WriteValue(WriteStream &writer, const int32_t &first_value) {
WriteValueInternal(writer, first_value);
}

template <>
void WriteValue(DbpEncoder &encoder, WriteStream &writer, const int32_t &value) {
WriteValue(encoder, writer, UnsafeNumericCast<int64_t>(value));
inline void DbpEncoder::WriteValue(WriteStream &writer, const int64_t &first_value) {
WriteValueInternal(writer, first_value);
}

template <>
void WriteValue(DbpEncoder &encoder, WriteStream &writer, const uint64_t &value) {
encoder.WriteValue(writer, UnsafeNumericCast<int64_t>(value));
inline void DbpEncoder::WriteValue(WriteStream &writer, const uint32_t &first_value) {
WriteValueInternal(writer, first_value);
}

template <>
void WriteValue(DbpEncoder &encoder, WriteStream &writer, const uint32_t &value) {
WriteValue(encoder, writer, UnsafeNumericCast<int64_t>(value));
inline void DbpEncoder::WriteValue(WriteStream &writer, const uint64_t &first_value) {
WriteValueInternal(writer, first_value);
}

} // namespace dbp_encoder

} // namespace duckdb
55 changes: 21 additions & 34 deletions extension/parquet/include/parquet_dlba_encoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,54 @@ namespace duckdb {
class DlbaEncoder {
public:
DlbaEncoder(const idx_t total_value_count_p, const idx_t total_string_size_p)
: dbp_encoder(total_value_count_p), total_string_size(total_string_size_p),
buffer(Allocator::DefaultAllocator().Allocate(total_string_size + 1)),
stream(make_unsafe_uniq<MemoryStream>(buffer.get(), buffer.GetSize())) {
: dbp_encoder(total_value_count_p), total_string_size(total_string_size_p) {
}

public:
void BeginWrite(WriteStream &writer, const string_t &first_value) {
dbp_encoder.BeginWrite(writer, UnsafeNumericCast<int64_t>(first_value.GetSize()));
stream->WriteData(const_data_ptr_cast(first_value.GetData()), first_value.GetSize());
template <class T>
void BeginWrite(Allocator &, WriteStream &, const T &) {
throw InternalException("DlbaEncoder should only be used with strings");
}

void WriteValue(WriteStream &writer, const string_t &value) {
dbp_encoder.WriteValue(writer, UnsafeNumericCast<int64_t>(value.GetSize()));
stream->WriteData(const_data_ptr_cast(value.GetData()), value.GetSize());
template <class T>
void WriteValue(WriteStream &, const T &) {
throw InternalException("DlbaEncoder should only be used with strings");
}

void FinishWrite(WriteStream &writer) {
dbp_encoder.FinishWrite(writer);
writer.WriteData(buffer.get(), stream->GetPosition());
}

template <class SRC>
static idx_t GetStringSize(const SRC &) {
return 0;
}

private:
DbpEncoder dbp_encoder;
const idx_t total_string_size;
AllocatedData buffer;
unsafe_unique_ptr<MemoryStream> stream;
};

namespace dlba_encoder {

template <class T>
void BeginWrite(DlbaEncoder &encoder, WriteStream &writer, const T &first_value) {
throw InternalException("Can't write type to DELTA_LENGTH_BYTE_ARRAY column");
}

template <>
void BeginWrite(DlbaEncoder &encoder, WriteStream &writer, const string_t &first_value) {
encoder.BeginWrite(writer, first_value);
}

template <class T>
void WriteValue(DlbaEncoder &encoder, WriteStream &writer, const T &value) {
throw InternalException("Can't write type to DELTA_LENGTH_BYTE_ARRAY column");
inline void DlbaEncoder::BeginWrite(Allocator &allocator, WriteStream &writer, const string_t &first_value) {
buffer = allocator.Allocate(total_string_size + 1);
stream = make_unsafe_uniq<MemoryStream>(buffer.get(), buffer.GetSize());
dbp_encoder.BeginWrite(writer, UnsafeNumericCast<int64_t>(first_value.GetSize()));
stream->WriteData(const_data_ptr_cast(first_value.GetData()), first_value.GetSize());
}

template <>
void WriteValue(DlbaEncoder &encoder, WriteStream &writer, const string_t &value) {
encoder.WriteValue(writer, value);
}

// helpers to get size from strings
template <class SRC>
static idx_t GetDlbaStringSize(const SRC &) {
return 0;
inline void DlbaEncoder::WriteValue(WriteStream &writer, const string_t &value) {
dbp_encoder.WriteValue(writer, UnsafeNumericCast<int64_t>(value.GetSize()));
stream->WriteData(const_data_ptr_cast(value.GetData()), value.GetSize());
}

template <>
idx_t GetDlbaStringSize(const string_t &src_value) {
inline idx_t DlbaEncoder::GetStringSize(const string_t &src_value) {
return src_value.GetSize();
}

} // namespace dlba_encoder

} // namespace duckdb
10 changes: 2 additions & 8 deletions extension/parquet/include/writer/primitive_column_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,10 @@ class PrimitiveColumnWriter : public ColumnWriter {

//! We limit the uncompressed page size to 100MB
//! The max size in Parquet is 2GB, but we choose a more conservative limit
static constexpr const idx_t MAX_UNCOMPRESSED_PAGE_SIZE = 100000000;
static constexpr const idx_t MAX_UNCOMPRESSED_PAGE_SIZE = 104857600ULL;
//! Dictionary pages must be below 2GB. Unlike data pages, there's only one dictionary page.
//! For this reason we go with a much higher, but still a conservative upper bound of 1GB;
static constexpr const idx_t MAX_UNCOMPRESSED_DICT_PAGE_SIZE = 1e9;
//! If the dictionary has this many entries, we stop creating the dictionary
static constexpr const idx_t DICTIONARY_ANALYZE_THRESHOLD = 1e4;
//! The maximum size a key entry in an RLE page takes
static constexpr const idx_t MAX_DICTIONARY_KEY_SIZE = sizeof(uint32_t);
//! The size of encoding the string length
static constexpr const idx_t STRING_LENGTH_SIZE = sizeof(uint32_t);
static constexpr const idx_t MAX_UNCOMPRESSED_DICT_PAGE_SIZE = 1073741824ULL;

public:
unique_ptr<ColumnWriterState> InitializeWriteState(duckdb_parquet::RowGroup &row_group) override;
Expand Down
32 changes: 21 additions & 11 deletions extension/parquet/include/writer/templated_column_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class StandardWriterPageState : public ColumnWriterPageState {
duckdb_parquet::Encoding::type encoding_p,
const PrimitiveDictionary<SRC, TGT, OP> &dictionary_p)
: encoding(encoding_p), dbp_initialized(false), dbp_encoder(total_value_count), dlba_initialized(false),
dlba_encoder(total_value_count, total_string_size), bss_encoder(total_value_count, sizeof(TGT)),
dictionary(dictionary_p), dict_written_value(false),
dlba_encoder(total_value_count, total_string_size), bss_initialized(false),
bss_encoder(total_value_count, sizeof(TGT)), dictionary(dictionary_p), dict_written_value(false),
dict_bit_width(RleBpDecoder::ComputeBitWidth(dictionary.GetSize())), dict_encoder(dict_bit_width) {
}
duckdb_parquet::Encoding::type encoding;
Expand All @@ -101,6 +101,7 @@ class StandardWriterPageState : public ColumnWriterPageState {
bool dlba_initialized;
DlbaEncoder dlba_encoder;

bool bss_initialized;
BssEncoder bss_encoder;

const PrimitiveDictionary<SRC, TGT, OP> &dictionary;
Expand Down Expand Up @@ -142,7 +143,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
switch (page_state.encoding) {
case duckdb_parquet::Encoding::DELTA_BINARY_PACKED:
if (!page_state.dbp_initialized) {
dbp_encoder::BeginWrite<int64_t>(page_state.dbp_encoder, temp_writer, 0);
page_state.dbp_encoder.BeginWrite(temp_writer, 0);
}
page_state.dbp_encoder.FinishWrite(temp_writer);
break;
Expand All @@ -158,11 +159,15 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
break;
case duckdb_parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
if (!page_state.dlba_initialized) {
dlba_encoder::BeginWrite<string_t>(page_state.dlba_encoder, temp_writer, string_t(""));
page_state.dlba_encoder.BeginWrite(BufferAllocator::Get(writer.GetContext()), temp_writer,
string_t(""));
}
page_state.dlba_encoder.FinishWrite(temp_writer);
break;
case duckdb_parquet::Encoding::BYTE_STREAM_SPLIT:
if (!page_state.bss_initialized) {
page_state.bss_encoder.BeginWrite(BufferAllocator::Get(writer.GetContext()));
}
page_state.bss_encoder.FinishWrite(temp_writer);
break;
case duckdb_parquet::Encoding::PLAIN:
Expand Down Expand Up @@ -201,7 +206,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
const auto &src_value = data_ptr[vector_index];
state.dictionary.Insert(src_value);
state.total_value_count++;
state.total_string_size += dlba_encoder::GetDlbaStringSize(src_value);
state.total_string_size += DlbaEncoder::GetStringSize(src_value);
}
} else {
for (idx_t i = 0; i < vcount; i++) {
Expand All @@ -212,7 +217,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
const auto &src_value = data_ptr[vector_index];
state.dictionary.Insert(src_value);
state.total_value_count++;
state.total_string_size += dlba_encoder::GetDlbaStringSize(src_value);
state.total_string_size += DlbaEncoder::GetStringSize(src_value);
}
vector_index++;
}
Expand Down Expand Up @@ -352,7 +357,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
}
const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
OP::template HandleStats<SRC, TGT>(stats, target_value);
dbp_encoder::BeginWrite(page_state.dbp_encoder, temp_writer, target_value);
page_state.dbp_encoder.BeginWrite(temp_writer, target_value);
page_state.dbp_initialized = true;
r++; // skip over
break;
Expand All @@ -365,7 +370,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
}
const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
OP::template HandleStats<SRC, TGT>(stats, target_value);
dbp_encoder::WriteValue(page_state.dbp_encoder, temp_writer, target_value);
page_state.dbp_encoder.WriteValue(temp_writer, target_value);
}
break;
}
Expand All @@ -379,7 +384,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
}
const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
OP::template HandleStats<SRC, TGT>(stats, target_value);
dlba_encoder::BeginWrite(page_state.dlba_encoder, temp_writer, target_value);
page_state.dlba_encoder.BeginWrite(BufferAllocator::Get(writer.GetContext()), temp_writer,
target_value);
page_state.dlba_initialized = true;
r++; // skip over
break;
Expand All @@ -392,18 +398,22 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
}
const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
OP::template HandleStats<SRC, TGT>(stats, target_value);
dlba_encoder::WriteValue(page_state.dlba_encoder, temp_writer, target_value);
page_state.dlba_encoder.WriteValue(temp_writer, target_value);
}
break;
}
case duckdb_parquet::Encoding::BYTE_STREAM_SPLIT: {
if (page_state.bss_initialized) {
page_state.bss_encoder.BeginWrite(BufferAllocator::Get(writer.GetContext()));
page_state.bss_initialized = true;
}
for (idx_t r = chunk_start; r < chunk_end; r++) {
if (!ALL_VALID && !mask.RowIsValid(r)) {
continue;
}
const TGT target_value = OP::template Operation<SRC, TGT>(data_ptr[r]);
OP::template HandleStats<SRC, TGT>(stats, target_value);
bss_encoder::WriteValue(page_state.bss_encoder, target_value);
page_state.bss_encoder.WriteValue(target_value);
}
break;
}
Expand Down
Loading
Loading
0