Description
What happened + What you expected to happen
I found some issues when using dataset.write_iceberg
.
This is my test data file data.csv
id,data
1,1
2,2
3,3
4,4
This is my job, I use two tasks, each consumes two rows.
INPUT = "./data.csv"
TABLE = "test_iceberg.table_1"
OUTPUT = "./output.csv"
ds = ray.data.read_csv(paths=INPUT).repartition(2)
ds.write_iceberg(
table_identifier=TABLE,
concurrency=2,
)
ds1 = ray.data.read_iceberg(
table_identifier=TABLE,
selected_fields=("id",),
)
ds2.write_csv(path=OUTPUT, num_rows_per_file=100)
I write the test data into the iceberg table, and then read it from iceberg table. The following is the output result, 3, 3
and 4, 4
disappeared.
id,data
1,1
2,2
1,1
2,2
I guess the data written to iceberg is wrong, so printed some logs when write to iceberg, and found two tasks wrote to the same file.
# task1 wrote to
s3://metastore/ab41d4e8f7d13aed/catalogs/a33d63478c6a8d5c/schemas/9dfb0fd4607250b3/tables/a1ccb264901f053d/data/category=BsC0tAlRF12eZjY2FFYLLddPhEv3oXsLOVNTnd0efuEQeR5zz7/00000-0-2cb19c81-42d2-4dbb-9143-3066b499e1cd.parque
# task2 wrote to
s3://metastore/ab41d4e8f7d13aed/catalogs/a33d63478c6a8d5c/schemas/9dfb0fd4607250b3/tables/a1ccb264901f053d/data/category=BsC0tAlRF12eZjY2FFYLLddPhEv3oXsLOVNTnd0efuEQeR5zz7/00000-0-2cb19c81-42d2-4dbb-9143-3066b499e1cd.parque
The file written by the task is determined by the uuid
. Using the same uuid for all tasks will cause all tasks to write to the same file, and only the data read by one task will be written.
So when writing to iceberg, I fixed it by using different uuid for each task.
Here is my fix. #52956
Versions / Dependencies
version: 2.44.1
pyiceberg==0.8.0
Reproduction script
INPUT = "./data.csv"
TABLE = "test_iceberg.table_1"
OUTPUT = "./output.csv"
ds = ray.data.read_csv(paths=INPUT).repartition(2)
ds.write_iceberg(
table_identifier=TABLE,
concurrency=2,
)
ds1 = ray.data.read_iceberg(
table_identifier=TABLE,
selected_fields=("id",),
)
ds2.write_csv(path=OUTPUT, num_rows_per_file=100)
Issue Severity
None