8000 ray: fix handling large chunks by ranandfigma · Pull Request #53535 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

ray: fix handling large chunks #53535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/data/_internal/arrow_ops/transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ def _try_combine_chunks_safe(
chunk_size = chunk.nbytes

if cur_slice_size_bytes + chunk_size > max_chunk_size:
slices.append(array.chunks[cur_slice_start:i])
if cur_slice_start != i:
slices.append(array.chunks[cur_slice_start:i])

cur_slice_start = i
cur_slice_size_bytes = 0
Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/tests/test_transform_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.air.util.tensor_extensions.arrow import ArrowTensorTypeV2
from ray.data import DataContext
from ray.data._internal.arrow_ops.transform_pyarrow import (
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS,
MIN_PYARROW_VERSION_TYPE_PROMOTION,
concat,
hash_partition,
Expand Down Expand Up @@ -49,6 +50,20 @@ def test_try_defragment_table():
assert dt == t


def test_defragment_large_table():

big = pa.array(list(range(800_000_000)), type=pa.int32()) # ~2GiB
Copy link
Preview
Copilot AI Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructing a Python list of 800 million ints can OOM or be very slow. Consider using numpy.arange or a PyArrow streaming interface to build the array more efficiently.

Suggested change
big = pa.array(list(range(800_000_000)), type=pa.int32()) # ~2GiB
big = pa.array(np.arange(800_000_000), type=pa.int32()) # ~2GiB

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like a good suggestion actually

chunked = [big]
for _ in range(MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS):
chunked.append(pa.array([1, 2, 3], type=pa.int32())) # a little tail chunk
chunked = pa.chunked_array(chunked)

table = pa.Table.from_arrays([chunked], names=["col"])

data = try_combine_chunked_columns(table)
assert len(data["col"].chunks) == 2


def test_hash_partitioning():
# Test hash-partitioning of the empty table
empty_table = pa.Table.from_pydict({"idx": []})
Expand Down
0