8000 Add support for Map writing to Arrow vtab by sgrebnov · Pull Request #439 · duckdb/duckdb-rs · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add support for Map writing to Arrow vtab #439

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related ema 8000 ils.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
8000
Diff view
10 changes: 7 additions & 3 deletions crates/duckdb/src/core/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ impl ListVector {
}

/// Take the child as [StructVector].
pub fn struct_child(&self) -> StructVector {
pub fn struct_child(&self, capacity: usize) -> StructVector {
self.reserve(capacity);
StructVector::from(unsafe { duckdb_list_vector_get_child(self.entries.ptr) })
}

Expand Down Expand Up @@ -300,8 +301,11 @@ impl From<duckdb_vector> for StructVector {

impl StructVector {
/// Returns the child by idx in the list vector.
pub fn child(&self, idx: usize) -> FlatVector {
FlatVector::from(unsafe { duckdb_struct_vector_get_child(self.ptr, idx as u64) })
pub fn child(&self, idx: usize, capacity: usize) -> FlatVector {
FlatVector::with_capacity(
unsafe { duckdb_struct_vector_get_child(self.ptr, idx as u64) },
capacity,
)
}

/// Take the child as [StructVector].
Expand Down
121 changes: 115 additions & 6 deletions crates/duckdb/src/vtab/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
core::{ArrayVector, FlatVector, Inserter, ListVector, StructVector, Vector},
types::DuckString,
};
use arrow::array::as_map_array;
use arrow::{
array::{
as_boolean_array, as_generic_binary_array, as_large_list_array, as_list_array, as_primitive_array,
Expand Down Expand Up @@ -204,6 +205,7 @@ pub fn to_duckdb_logical_type(data_type: &DataType) -> Result<LogicalTypeHandle,
// DuckDB does not support negative decimal scales
Ok(LogicalTypeHandle::decimal(*width, (*scale).try_into().unwrap()))
}
DataType::Map(field, _) => arrow_map_to_duckdb_logical_type(field),
DataType::Boolean
| DataType::Utf8
| DataType::LargeUtf8
Expand All @@ -220,6 +222,35 @@ pub fn to_duckdb_logical_type(data_type: &DataType) -> Result<LogicalTypeHandle,
}
}

fn arrow_map_to_duckdb_logical_type(field: &FieldRef) -> Result<LogicalTypeHandle, Box<dyn std::error::Error>> {
// Map is a logical nested type that is represented as `List<entries: Struct<key: K, value: V>>`
let DataType::Struct(ref fields) = field.data_type() else {
return Err(format!(
"The inner field of a Map must be a Struct, got: {:?}",
field.data_type()
)
.into());
};

if fields.len() != 2 {
return Err(format!(
"The inner Struct field of a Map must have 2 fields, got {} fields",
fields.len()
)
.into());
}

let (Some(key_field), Some(value_field)) = (fields.first(), fields.get(1)) else {
// number of fields is verified above
unreachable!()
};

Ok(LogicalTypeHandle::map(
&LogicalTypeHandle::from(to_duckdb_type_id(key_field.data_type())?),
&LogicalTypeHandle::from(to_duckdb_type_id(value_field.data_type())?),
))
}

// FIXME: flat vectors don't have all of thsese types. I think they only
/// Converts flat vector to an arrow array
pub fn flat_vector_to_arrow_array(
Expand Down Expand Up @@ -586,6 +617,19 @@ pub fn write_arrow_array_to_vector(
let mut struct_vector = chunk.struct_vector();
struct_array_to_vector(struct_array, &mut struct_vector)?;
}
DataType::Map(_, _) => {
// [`MapArray`] is physically a [`ListArray`] of key values pairs stored as an `entries` [`StructArray`] with 2 child fields.
let map_array = as_map_array(col.as_ref());
let out = &mut chunk.list_vector();
struct_array_to_vector(map_array.entries(), &mut out.struct_child(map_array.entries().len()))?;

for i in 0..map_array.len() {
let offset = map_array.value_offsets()[i];
let length = map_array.value_length(i);
out.set_entry(i, offset.as_(), length.as_());
}
set_nulls_in_list_vector(map_array, out);
}
dt => {
return Err(format!(
"column with data_type {} is not supported yet, please file an issue https://github.com/wangfenjin/duckdb-rs",
Expand Down Expand Up @@ -935,7 +979,10 @@ fn list_array_to_vector<O: OffsetSizeTrait + AsPrimitive<usize>>(
fixed_size_list_array_to_vector(as_fixed_size_list_array(value_array.as_ref()), &mut out.array_child())?;
}
DataType::Struct(_) => {
struct_array_to_vector(as_struct_array(value_array.as_ref()), &mut out.struct_child())?;
struct_array_to_vector(
as_struct_array(value_array.as_ref()),
&mut out.struct_child(value_array.len()),
)?;
}
_ => {
return Err(format!(
Expand Down Expand Up @@ -993,13 +1040,13 @@ fn struct_array_to_vector(array: &StructArray, out: &mut StructVector) -> Result
let column = array.column(i);
match column.data_type() {
dt if dt.is_primitive() || matches!(dt, DataType::Boolean) => {
primitive_array_to_vector(column, &mut out.child(i))?;
primitive_array_to_vector(column, &mut out.child(i, array.len()))?;
}
DataType::Utf8 => {
string_array_to_vector(as_string_array(column.as_ref()), &mut out.child(i));
string_array_to_vector(as_string_array(column.as_ref()), &mut out.child(i, array.len()));
}
DataType::Binary => {
binary_array_to_vector(as_generic_binary_array(column.as_ref()), &mut out.child(i));
binary_array_to_vector(as_generic_binary_array(column.as_ref()), &mut out.child(i, array.len()));
}
DataType::List(_) => {
list_array_to_vector(as_list_array(column.as_ref()), &mut out.list_vector_child(i))?;
Expand Down Expand Up @@ -1112,10 +1159,10 @@ mod test {
Array, ArrayRef, AsArray, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, DurationSecondArray, FixedSizeListArray, FixedSizeListBuilder,
GenericByteArray, GenericListArray, Int32Array, Int32Builder, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeStringArray, ListArray, ListBuilder,
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeStringArray, ListArray, ListBuilder, MapArray,
OffsetSizeTrait, PrimitiveArray, StringArray, StringViewArray, StructArray, Time32SecondArray,
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
TimestampSecondArray, UInt32Array,
},
buffer::{OffsetBuffer, ScalarBuffer},
d 8000 atatypes::{
Expand Down Expand Up @@ -1894,4 +1941,66 @@ mod test {

Ok(())
}

fn check_map_array_roundtrip(array: MapArray) -> Result<(), Box<dyn Error>> {
let expected = array.clone();

let db = Connection::open_in_memory()?;
db.register_table_function::<ArrowVTab>("arrow")?;

// Roundtrip a record batch from Rust to DuckDB and back to Rust
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]);

let rb = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array.clone())])?;
let param = arrow_recordbatch_to_query_params(rb.clone());
let mut stmt = db.prepare("select a from arrow(?, ?)")?;
let rb = stmt.query_arrow(param)?.next().expect("no record batch");
let output_array = rb
.column(0)
.as_any()
.downcast_ref::<MapArray>()
.expect("Expected MapArray");

assert_eq!(output_array.keys(), expected.keys());
assert_eq!(output_array.values(), expected.values());

Ok(())
}

#[test]
fn test_map_roundtrip() -> Result<(), Box<dyn Error>> {
// Test 1 - simple MapArray
let keys = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
let values_data = UInt32Array::from(vec![
Some(0u32),
None,
Some(20),
Some(30),
None,
Some(50),
Some(60),
Some(70),
]);
// Construct a buffer for value offsets, for the nested array:
// [[a, b, c], [d, e, f], [g, h]]
let entry_offsets = [0, 3, 6, 8];
let map_array = MapArray::new_from_strings(keys.clone().into_iter(), &values_data, &entry_offsets).unwrap();
check_map_array_roundtrip(map_array)?;

// Test 2 - large MapArray of 4000 elements to test buffers capacity adjustment
let keys: Vec<String> = (0.. 63CC 4000).map(|i| format!("key-{}", i)).collect();
let values_data = UInt32Array::from(
(0..4000)
.map(|i| if i % 5 == 0 { None } else { Some(i as u32) })
.collect::<Vec<_>>(),
);
let mut entry_offsets: Vec<u32> = (0..=4000).step_by(3).collect();
entry_offsets.push(4000);
let map_array =
MapArray::new_from_strings(keys.iter().map(String::as_str), &values_data, entry_offsets.as_slice())
.unwrap();
check_map_array_roundtrip(map_array)?;

Ok(())
}
}
Loading
0