8000 [Data] `dataset.write_iceberg` error · Issue #52967 · ray-project/ray · GitHub 8000
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
[Data] dataset.write_iceberg error #52967
Open
@ran1995data

Description

@ran1995data

What happened + What you expected to happen

I found some issues when using dataset.write_iceberg.
This is my test data file data.csv

id,data
1,1
2,2
3,3
4,4

This is my job, I use two tasks, each consumes two rows.

INPUT = "./data.csv"
TABLE = "test_iceberg.table_1"
OUTPUT = "./output.csv"
ds = ray.data.read_csv(paths=INPUT).repartition(2)
ds.write_iceberg(
    table_identifier=TABLE,
    concurrency=2,
)
ds1 = ray.data.read_iceberg(
    table_identifier=TABLE,
    selected_fields=("id",),
)
ds2.write_csv(path=OUTPUT, num_rows_per_file=100)

I write the test data into the iceberg table, and then read it from iceberg table. The following is the output result, 3, 3 and 4, 4 disappeared.

id,data
1,1
2,2
1,1
2,2

I guess the data written to iceberg is wrong, so printed some logs when write to iceberg, and found two tasks wrote to the same file.

# task1 wrote to 
s3://metastore/ab41d4e8f7d13aed/catalogs/a33d63478c6a8d5c/schemas/9dfb0fd4607250b3/tables/a1ccb264901f053d/data/category=BsC0tAlRF12eZjY2FFYLLddPhEv3oXsLOVNTnd0efuEQeR5zz7/00000-0-2cb19c81-42d2-4dbb-9143-3066b499e1cd.parque
# task2 wrote to 
s3://metastore/ab41d4e8f7d13aed/catalogs/a33d63478c6a8d5c/schemas/9dfb0fd4607250b3/tables/a1ccb264901f053d/data/category=BsC0tAlRF12eZjY2FFYLLddPhEv3oXsLOVNTnd0efuEQeR5zz7/00000-0-2cb19c81-42d2-4dbb-9143-3066b499e1cd.parque

The file written by the task is determined by the uuid. Using the same uuid for all tasks will cause all tasks to write to the same file, and only the data read by one task will be written.
So when writing to iceberg, I fixed it by using different uuid for each task.

Here is my fix. #52956

Versions / Dependencies

version: 2.44.1
pyiceberg==0.8.0

Reproduction script

INPUT = "./data.csv"
TABLE = "test_iceberg.table_1"
OUTPUT = "./output.csv"
ds = ray.data.read_csv(paths=INPUT).repartition(2)
ds.write_iceberg(
    table_identifier=TABLE,
    concurrency=2,
)
ds1 = ray.data.read_iceberg(
    table_identifier=TABLE,
    selected_fields=("id",),
)
ds2.write_csv(path=OUTPUT, num_rows_per_file=100)

Issue Severity

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    P0Issues that should be fixed in short orderbugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesstability

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0