From 459a892006b10c088595050d7b6f8b7058081b1b Mon Sep 17 00:00:00 2001 From: Douglas Strodtman Date: Sun, 23 Mar 2025 16:47:21 -0400 Subject: [PATCH 01/15] wip --- doc/source/data/data.rst | 2 +- doc/source/data/key-concepts.rst | 33 +++++++--- doc/source/data/loading-data.rst | 2 + doc/source/data/quickstart.rst | 100 ++++++++++++++++++++++++------- 4 files changed, 108 insertions(+), 29 deletions(-) diff --git a/doc/source/data/data.rst b/doc/source/data/data.rst index 8459956534d7..61365a090f3d 100644 --- a/doc/source/data/data.rst +++ b/doc/source/data/data.rst @@ -8,7 +8,7 @@ Ray Data: Scalable Datasets for ML :hidden: quickstart - key-concepts + Concepts user-guide examples api/api diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 88fdeb94ca65..5331b3b5f701 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -1,13 +1,30 @@ .. _data_key_concepts: -Key Concepts -============ +How does Ray Data work? +======================= + +This page provides a conceptual overview of the architecture and execution model for Ray Data. Understanding these technical details can be useful when designing, debugging, and optimizing Ray applications. + + + + +.. _dataset_conceptual: + + +What is a Ray Dataset? +---------------------- + + + +You can use Datasets to interact with data. The following are examples + +* + +What does a block represent in Ray? +----------------------------------- -Datasets and blocks -------------------- -There are two main concepts in Ray Data: * Datasets * Blocks @@ -36,8 +53,10 @@ Pandas Dataframes or Arrow tables. .. https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit -Operators and Plans -------------------- +.. _plans: + +How does Ray Data plan and execute operations? +---------------------------------------------- Ray Data uses a two-phase planning process to execute operations efficiently. When you write a program using the Dataset API, Ray Data first builds a *logical plan* - a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations. diff --git a/doc/source/data/loading-data.rst b/doc/source/data/loading-data.rst index 744aee173238..e51548229e6b 100644 --- a/doc/source/data/loading-data.rst +++ b/doc/source/data/loading-data.rst @@ -338,6 +338,8 @@ You can use any `codec supported by Arrow ` abstraction for distributed data processing. - -This guide introduces you to the core capabilities of Ray Data: +This guide introduces you to the core capabilities of Ray Data and includes examples of using the Ray :class:`Dataset ` abstraction for the following tasks: * :ref:`Loading data ` * :ref:`Transforming data ` * :ref:`Consuming data ` * :ref:`Saving data ` -Datasets --------- -Ray Data's main abstraction is a :class:`Dataset `, which -represents a distributed collection of data. Datasets are specifically designed for machine learning workloads -and can efficiently handle data collections that exceed a single machine's memory. + +The code examples provided in this guide build upon one another sequentially. You can run these commands interactively in a Jupyter notebook or on Anyscale. Each code example materializes results to return a preview of results. Operations that materialize results or preview data force Ray to perform actions. Removing these operations allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` + + + +Combine these steps to complete data preprocessing or ETL (extract, transform, and load) workloads. For an example of + + + +What is a Ray Dataset? +---------------------- + +Ray Data's main abstraction is a :class:`Dataset `, which represents a distributed collection of data. Datasets are specifically designed for machine learning workloads and can efficiently handle data collections that exceed a single machine's memory. + +Ray Datasets are similar to DataFrames and Datasets in TensorFlow, PyTorch, pandas, and Apache Spark. Ray Data provides interoperatibility with these and other libraries. See :ref:`Loading data from other libraries`. + +For details on how Ray Datasets represent data for distributed processing, see :ref:`How does Ray Data work?` .. _loading_key_concept: -Loading data ------------- +Load data +--------- + +You can load data to create datasets from sources including local files, Python objects, and cloud storage services like S3 or GCS. + +Ray Data seamlessly integrates with any `filesystem supported by Arrow`__. -Create datasets from various sources including local files, Python objects, and cloud storage services like S3 or GCS. -Ray Data seamlessly integrates with any `filesystem supported by Arrow -`__. +The following code example loads CSV data from an S3 bucket and previews the data: .. testcode:: @@ -46,12 +58,16 @@ To learn more about creating datasets from different sources, read :ref:`Loading .. _transforming_key_concept: -Transforming data ------------------ +Transform data +-------------- + +Define data transformations Apply user-defined functions (UDFs) to transform datasets. Ray automatically parallelizes these transformations across your cluster for better performance. +The following code example applies a UDF to calculate the petal area for the CSV data loaded + .. testcode:: from typing import Dict @@ -91,8 +107,8 @@ To explore more transformation capabilities, read :ref:`Transforming data `. + + +.. _etl_example: + +ETL with Ray Data +----------------- + +The following code example loads CSV data from S3, applies a data transformation to calculate a new field, and saves results using Parquet. + + + +.. testcode:: + :hide: + + # The number of blocks can be non-determinstic. Repartition the dataset beforehand + # so that the number of written files is consistent. + transformed_ds = transformed_ds.repartition(2) + +.. testcode:: + + import os + + # Save the transformed dataset as Parquet files + transformed_ds.write_parquet("/tmp/iris") + + # Verify the files were created + print(os.listdir("/tmp/iris")) + +.. testoutput:: + :options: +MOCK + + ['..._000000.parquet', '..._000001.parquet'] + +.. _preprocessing_example: + +Data preprocessing with Ray Data +-------------------------------- + +The following code example loads CSV data from S3, applies a data transformation to calculate a new field, and returns the result as the Python variable `train_dataset`. + + + From 2c473e96926cdf26580acbe236847261e1511128 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Mon, 24 Mar 2025 08:41:46 -0400 Subject: [PATCH 02/15] wip --- doc/source/data/data-internals.rst | 77 ---------------------------- doc/source/data/key-concepts.rst | 58 +++++++++++++-------- doc/source/data/performance-tips.rst | 49 ++++++++++++++++++ doc/source/data/quickstart.rst | 4 +- 4 files changed, 88 insertions(+), 100 deletions(-) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 48ad6f2ab182..b664afb8bf2d 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -14,83 +14,6 @@ For a gentler introduction to Ray Data, see :ref:`Quickstart `. Key concepts ============ -Datasets and blocks -------------------- - -Datasets -~~~~~~~~ - -:class:`Dataset ` is the main user-facing Python API. It represents a -distributed data collection, and defines data loading and processing operations. You -typically use the API in this way: - -1. Create a Ray Dataset from external storage or in-memory data. -2. Apply transformations to the data. -3. Write the outputs to external storage or feed the outputs to training workers. - -Blocks -~~~~~~ - -A *block* is the basic unit of data bulk that Ray Data stores in the object store and -transfers over the network. Each block contains a disjoint subset of rows, and Ray Data -loads and transforms these blocks in parallel. - -The following figure visualizes a dataset with three blocks, each holding 1000 rows. -Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution -(which is usually the driver) and stores the blocks as objects in Ray's shared-memory -:ref:`object store `. - -.. image:: images/dataset-arch.svg - -.. - https://docs.google.com/drawings/d/1PmbDvHRfVthme9XD7EYM-LIHPXtHdOfjCbc1SCsM64k/edit - -Block formats -~~~~~~~~~~~~~ - -Blocks are Arrow tables or `pandas` DataFrames. Generally, blocks are Arrow tables -unless Arrow can’t represent your data. - -The block format doesn’t affect the type of data returned by APIs like -:meth:`~ray.data.Dataset.iter_batches`. - -Block size limiting -~~~~~~~~~~~~~~~~~~~ - -Ray Data bounds block sizes to avoid excessive communication overhead and prevent -out-of-memory errors. Small blocks are good for latency and more streamed execution, -while large blocks reduce scheduler and communication overhead. The default range -attempts to make a good tradeoff for most jobs. - -Ray Data attempts to bound block sizes between 1 MiB and 128 MiB. To change the block -size range, configure the ``target_min_block_size`` and ``target_max_block_size`` -attributes of :class:`~ray.data.context.DataContext`. - -.. testcode:: - - import ray - - ctx = ray.data.DataContext.get_current() - ctx.target_min_block_size = 1 * 1024 * 1024 - ctx.target_max_block_size = 128 * 1024 * 1024 - -Dynamic block splitting -~~~~~~~~~~~~~~~~~~~~~~~ - -If a block is larger than 192 MiB (50% more than the target max size), Ray Data -dynamically splits the block into smaller blocks. - -To change the size at which Ray Data splits blocks, configure -``MAX_SAFE_BLOCK_SIZE_FACTOR``. The default value is 1.5. - -.. testcode:: - - import ray - - ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5 - -Ray Data can’t split rows. So, if your dataset contains large rows (for example, large -images), then Ray Data can’t bound the block size. Operators, plans, and planning ------------------------------ diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 5331b3b5f701..780984858923 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -5,30 +5,30 @@ How does Ray Data work? This page provides a conceptual overview of the architecture and execution model for Ray Data. Understanding these technical details can be useful when designing, debugging, and optimizing Ray applications. +To get started working with Ray Data, see :ref:`Ray Data basics`. +How does Ray Data relate to the rest of Ray? +-------------------------------------------- +Ray Data is one of the Ray AI Libraries. -.. _dataset_conceptual: +.. image:: ../ray-overview/images/map-of-ray.svg + :align: center + :alt: Ray Framework Architecture +Ray Data uses the `Dataset` abstraction to map common data operations to Ray Core primitives. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. -What is a Ray Dataset? ----------------------- +Ray Data integrates with Ray Train for optimized data loading, preprocessing, and feature engineering. See :ref:`Ray Train overview`. +.. _dataset_conceptual: +What is a Ray Dataset? +---------------------- -You can use Datasets to interact with data. The following are examples +Use Datasets to interact with data. The following are examples * -What does a block represent in Ray? ------------------------------------ - - - - -* Datasets -* Blocks - `Dataset` is the main user-facing Python API. It represents a distributed data collection and define data loading and processing operations. Users typically use the API by: 1. Create a :class:`Dataset ` from external storage or in-memory data. @@ -39,15 +39,31 @@ The Dataset API is lazy, meaning that operations aren't executed until you mater like :meth:`~ray.data.Dataset.show`. This allows Ray Data to optimize the execution plan and execute operations in a pipelined streaming fashion. -Each *Dataset* consists of *blocks*. A *block* is a contiguous subset of rows from a dataset, -which are distributed across the cluster and processed independently in parallel. +What does a block represent in Ray? +----------------------------------- + +Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data + +Blocks have the following characteristics: + +* Each record or row in a Dataset is only present in one block. +* Blocks are distributed across the cluster for independent processing. +* Blocks are processed in parallel and sequentially, depending on the operations present in an application. + + +If you're troubleshooting or optimizing Ray Data workloads, consider the following details and special cases: + +* The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. + * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. + * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. + * You can configure block size and splitting behaviors. See :ref:`Block size and performance`. +* Ray uses :ref:`Arrow tables` to internally represent blocks of data. + * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See :ref:`Arrow and pandas type differences`. + * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. + +Overall -The following figure visualizes a dataset with three blocks, each holding 1000 rows. -Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution -(which is usually the entrypoint of the program, referred to as the :term:`driver`) -and stores the blocks as objects in Ray's shared-memory -:ref:`object store `. Internally, Ray Data represents blocks with -Pandas Dataframes or Arrow tables. +The following figure visualizes a dataset with three blocks, each holding 1000 rows.Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. .. image:: images/dataset-arch-with-blocks.svg .. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index b496e4fbdc77..1f739c49ba37 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -3,6 +3,53 @@ Advanced: Performance Tips and Tuning ===================================== +.. _block_size: + +Block size and performance +-------------------------- + +Ray Data bounds block sizes to avoid excessive communication overhead and prevent out-of-memory errors. Block size relates to the following performance tuning considerations: + +* :ref:`Tuning output blocks for read` +* :ref:`Troubleshooting out-of-memory errors` +* :ref:`Handling too-small blocks` + +Smaller blocks are good for latency and more streamed execution, while larger blocks reduce scheduler and communication overhead. The default range attempts to make a good tradeoff for most jobs using the following rules: + +* A best-effort is made to bound block sizes between 1 MiB and 128 MiB. +* Blocks are dynamically split if they exceed the target max block size by 50% or more (192 MiB by default). + +.. note:: + + Ray Data can't split rows. If your dataset contains large rows (for example, large images), then Ray Data can't bound the block size. + +Configure block size +~~~~~~~~~~~~~~~~~~~~ + +By default, Ray Data attempts to bound block sizes between 1 MiB and 128 MiB. To change the block size range, configure the ``target_min_block_size`` and ``target_max_block_size`` attributes of :class:`~ray.data.context.DataContext`. The following syntax example sets these attributes using the default values: + +.. testcode:: + + import ray + + ctx = ray.data.DataContext.get_current() + ctx.target_min_block_size = 1 * 1024 * 1024 + ctx.target_max_block_size = 128 * 1024 * 1024 + +Configure dynamic block splitting +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Ray Data dynamically splits a block into smaller blocks when it exceeds a size threshold. This threshold is specified as a multiplier of the `target_max_block_size`. By default, the scaling fact is set to `1.5`, meaning a block is split if it is 50% larger than the target max size, or 192 MiB by default. + +To change the size at which Ray Data splits blocks, configure the `MAX_SAFE_BLOCK_SIZE_FACTOR` attribute. The following syntax example sets this attribute using the default scaling factor: + +.. testcode:: + + import ray + + ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5 + + Optimizing transforms --------------------- @@ -333,6 +380,8 @@ The recommended strategy is to manually increase the :ref:`read output blocks ` * :ref:`Saving data ` +The code examples provided in this guide build upon one another sequentially. You can run these commands interactively in a Jupyter notebook or on Anyscale. - -The code examples provided in this guide build upon one another sequentially. You can run these commands interactively in a Jupyter notebook or on Anyscale. Each code example materializes results to return a preview of results. Operations that materialize results or preview data force Ray to perform actions. Removing these operations allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` +Provided code example materializes results to preview data at each step. When moving code to production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` From a9881004892f648bff70bdb5aeb2e1c94868b0b9 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Mon, 24 Mar 2025 22:26:04 -0400 Subject: [PATCH 03/15] wip --- doc/source/data/key-concepts.rst | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 780984858923..2106b70bf1dc 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -18,7 +18,22 @@ Ray Data is one of the Ray AI Libraries. Ray Data uses the `Dataset` abstraction to map common data operations to Ray Core primitives. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. -Ray Data integrates with Ray Train for optimized data loading, preprocessing, and feature engineering. See :ref:`Ray Train overview`. +Ray Data integrates with Ray Train and Ray Tune for optimized data loading, preprocessing, and feature engineering. See :ref:`Ray Train overview`. + +How does Ray Data distribute data? +---------------------------------- + + + +Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. + +The following figure visualizes a dataset with three blocks, each holding 1000 rows. + +.. image:: images/dataset-arch-with-blocks.svg + :alt: Ray Dataset with three blocks +.. + https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit + .. _dataset_conceptual: @@ -61,13 +76,7 @@ If you're troubleshooting or optimizing Ray Data workloads, consider the followi * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See :ref:`Arrow and pandas type differences`. * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. -Overall -The following figure visualizes a dataset with three blocks, each holding 1000 rows.Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. - -.. image:: images/dataset-arch-with-blocks.svg -.. - https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit .. _plans: @@ -128,8 +137,9 @@ Streaming execution model Ray Data uses a *streaming execution model* to efficiently process large datasets. -Rather than materializing the entire dataset in memory at once, -Ray Data can process data in a streaming fashion through a pipeline of operations. +Rather than materializing the entire dataset in memory at once, Ray Data can process data in a streaming fashion through a pipeline of operations. + + This is useful for inference and training workloads where the dataset can be too large to fit in memory and the workload doesn't require the entire dataset to be in memory at once. @@ -179,6 +189,6 @@ In particular, the pipeline architecture enables multiple stages to execute conc To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster. .. note:: - Operations like :meth:`ds.sort() ` and :meth:`ds.groupby() ` require materializing data, which may impact memory usage for very large datasets. + Operations including :meth:`ds.sort() ` and :meth:`ds.groupby() ` require materializing data, which may impact memory usage for very large datasets. You can read more about the streaming execution model in this `blog post `__. From 5cabccb57474b249f9b5fc9d9b1bc8c62bfaa4a1 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Tue, 25 Mar 2025 05:45:31 -0400 Subject: [PATCH 04/15] wip --- doc/source/data/data-internals.rst | 12 ------------ doc/source/data/key-concepts.rst | 3 ++- doc/source/data/performance-tips.rst | 14 ++++++++++++++ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index b664afb8bf2d..32fd7f42ad45 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -155,19 +155,7 @@ placement group. To use current placement group resources specifically for Ray D Consider this override only for advanced use cases to improve performance predictability. The general recommendation is to let Ray Data run outside placement groups. -.. _datasets_tune: -Ray Data and Tune ------------------ - -When using Ray Data in conjunction with :ref:`Ray Tune `, it's important to ensure there are enough free CPUs for Ray Data to run on. By default, Tune tries to fully utilize cluster CPUs. This can prevent Ray Data from scheduling tasks, reducing performance or causing workloads to hang. - -To ensure CPU resources are always available for Ray Data execution, limit the number of concurrent Tune trials with the ``max_concurrent_trials`` Tune option. - -.. literalinclude:: ./doc_code/key_concepts.py - :language: python - :start-after: __resource_allocation_1_begin__ - :end-before: __resource_allocation_1_end__ Memory Management ================= diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 2106b70bf1dc..1a23da71e07e 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -1,5 +1,6 @@ .. _data_key_concepts: +======================= How does Ray Data work? ======================= @@ -8,7 +9,7 @@ This page provides a conceptual overview of the architecture and execution model To get started working with Ray Data, see :ref:`Ray Data basics`. How does Ray Data relate to the rest of Ray? --------------------------------------------- +============================================ Ray Data is one of the Ray AI Libraries. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 1f739c49ba37..aded5875fb53 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -263,6 +263,20 @@ calling :func:`~ray.data.Dataset.select_columns`, since column selection is push Dataset(num_rows=150, schema={sepal.length: double, variety: string}) +.. _datasets_tune: + +Reserve cores for Ray Data when used with Ray Tune +-------------------------------------------------- + +When using Ray Data in conjunction with :ref:`Ray Tune `, it's important to ensure there are enough free CPUs for Ray Data to run on. By default, Tune tries to fully utilize cluster CPUs. This can prevent Ray Data from scheduling tasks, reducing performance or causing workloads to hang. + +To ensure CPU resources are always available for Ray Data execution, limit the number of concurrent Tune trials with the ``max_concurrent_trials`` Tune option. + +.. literalinclude:: ./doc_code/key_concepts.py + :language: python + :start-after: __resource_allocation_1_begin__ + :end-before: __resource_allocation_1_end__ + .. _data_memory: Reducing memory usage From f69e3b7d3e42ff99442853c5883816310534a386 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Tue, 25 Mar 2025 08:17:14 -0400 Subject: [PATCH 05/15] wip --- doc/source/data/key-concepts.rst | 60 ++++++++++++++++++++++------ doc/source/data/performance-tips.rst | 2 +- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 1a23da71e07e..ccbcc1bcaa4a 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -8,18 +8,35 @@ This page provides a conceptual overview of the architecture and execution model To get started working with Ray Data, see :ref:`Ray Data basics`. +For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. + How does Ray Data relate to the rest of Ray? ============================================ -Ray Data is one of the Ray AI Libraries. +Ray Data is one of the Ray AI Libraries. Ray AI libraries build on top of Ray Core primitives to provide developer-friendly APIs for completing common data, ML, and AI tasks. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. + +The follow diagram provides a high-level view of the Ray framework: .. image:: ../ray-overview/images/map-of-ray.svg :align: center - :alt: Ray Framework Architecture + :alt: Ray framework architecture + +Ray Data contains operators focused on the following key tasks: + +- Loading data from storage. +- Ingesting data from connected systems. +- Exchanging data from other framework and data structures. +- Transforming and preprocessing data. +- Performing offline batch inference. +- Persisting results to storage or integrated systems. +- Pipelining data for Ray Train. + +The primary abstraction :class:`~ray.data.Dataset` the `Dataset` abstraction to map common data operations to Ray Core primitives. -Ray Data uses the `Dataset` abstraction to map common data operations to Ray Core primitives. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. -Ray Data integrates with Ray Train and Ray Tune for optimized data loading, preprocessing, and feature engineering. See :ref:`Ray Train overview`. + +Ray Train is optimized to work on Ray Datasets. See :ref:`Process large datasets as streams +`. How does Ray Data distribute data? ---------------------------------- @@ -133,18 +150,31 @@ For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts `. -This is useful for inference and training workloads where the dataset can be too large to fit in memory and the workload doesn't require the entire dataset to be in memory at once. -Here is an example of how the streaming execution model works. The below code creates a dataset with 1K rows, applies a map and filter transformation, and then calls the ``show`` action to trigger the pipeline: +The following is a simple code example demonstrate the streaming execution model. , applies a map and filter transformation, and then calls the ``show`` action to trigger the pipeline: .. testcode:: @@ -162,7 +192,7 @@ Here is an example of how the streaming execution model works. The below code cr # Data starts flowing when you call a method like show() ds.show(5) -This creates a logical plan like the following: +The following is a simplified view of the resultant logical plan: .. code-block:: @@ -173,7 +203,7 @@ This creates a logical plan like the following: +- Dataset(schema={...}) -The streaming topology looks like the following: +This logical plan maps to the following streaming topology: .. https://docs.google.com/drawings/d/10myFIVtpI_ZNdvTSxsaHlOhA_gHRdUde_aHRC9zlfOw/edit @@ -190,6 +220,10 @@ In particular, the pipeline architecture enables multiple stages to execute conc To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster. .. note:: - Operations including :meth:`ds.sort() ` and :meth:`ds.groupby() ` require materializing data, which may impact memory usage for very large datasets. + Operations that need to evaluate, compare, or aggregate the entire dataset create processing bottlenecks for streaming execution. Examples include :meth:`ds.sort() ` and :meth:`ds.groupby() `. + + Ray must materialize the entire dataset to complete these operations, which interupts stream pipeline processing and might lead to significant spill or out-of-memory errors. + Consider refactoring workloads to remove unnecessary operations that require full dataset materialization. For example, the distributed model used by Ray does not persist ordered results between stages or guarantee that sorting is preserved on write. For many workloads, removing a :meth:`ds.sort() ` operation can eliminate significant overhead without impacting results in any way. + You can read more about the streaming execution model in this `blog post `__. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index aded5875fb53..5aa985baae4d 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -1,6 +1,6 @@ .. _data_performance_tips: -Advanced: Performance Tips and Tuning +Advanced: Performance tips and tuning ===================================== .. _block_size: From f7b8fb3a2878c80f11698b54ffbb6dd490a12833 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Tue, 25 Mar 2025 08:39:28 -0400 Subject: [PATCH 06/15] wip --- doc/source/data/key-concepts.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index ccbcc1bcaa4a..068b7c4e1570 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -31,12 +31,11 @@ Ray Data contains operators focused on the following key tasks: - Persisting results to storage or integrated systems. - Pipelining data for Ray Train. -The primary abstraction :class:`~ray.data.Dataset` the `Dataset` abstraction to map common data operations to Ray Core primitives. +Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core primitives. -Ray Train is optimized to work on Ray Datasets. See :ref:`Process large datasets as streams -`. +Ray Train is optimized to work on Ray Datasets. See :ref:`Process large datasets as streams`. How does Ray Data distribute data? ---------------------------------- From 336adf7c7f2a9e7afb5055f786b8f73b260a4a82 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Sat, 5 Apr 2025 13:03:56 -0400 Subject: [PATCH 07/15] wip --- doc/Pipfile | 11 ++++++++ doc/source/data/data-internals.rst | 41 +++++++++++++++------------- doc/source/data/data.rst | 1 + doc/source/data/key-concepts.rst | 15 +++++----- doc/source/data/performance-tips.rst | 9 ++++-- doc/source/data/quickstart.rst | 2 +- doc/source/data/user-guide.rst | 1 - 7 files changed, 50 insertions(+), 30 deletions(-) create mode 100644 doc/Pipfile diff --git a/doc/Pipfile b/doc/Pipfile new file mode 100644 index 000000000000..d61ea531d92f --- /dev/null +++ b/doc/Pipfile @@ -0,0 +1,11 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] + +[dev-packages] + +[requires] +python_version = "3.13" diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 32fd7f42ad45..69a49e2d11ea 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -1,25 +1,28 @@ .. _datasets_scheduling: ================== -Ray Data Internals +Ray Data internals ================== -This guide describes the implementation of Ray Data. The intended audience is advanced -users and Ray Data developers. +This page provides an overview of technical implementation details of Ray Data related to the following: -For a gentler introduction to Ray Data, see :ref:`Quickstart `. +* -.. _dataset_concept: -Key concepts -============ +The intended audience is advanced users, Ray contributors, and Ray Data developers. + +* To get started working with Ray Data, see :ref:`Ray Data basics`. +* For a conceptual overview of Ray Data, see :ref:`How does Ray Data work?`. +* For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. + +How does Ray +============================== + -Operators, plans, and planning ------------------------------- Operators -~~~~~~~~~ +--------- There are two types of operators: *logical operators* and *physical operators*. Logical operators are stateless objects that describe “what” to do. Physical operators are @@ -27,7 +30,7 @@ stateful objects that describe “how” to do it. An example of a logical opera ``ReadOp``, and an example of a physical operator is ``TaskPoolMapOperator``. Plans -~~~~~ +----- A *logical plan* is a series of logical operators, and a *physical plan* is a series of physical operators. When you call APIs like :func:`ray.data.read_images` and @@ -35,7 +38,7 @@ physical operators. When you call APIs like :func:`ray.data.read_images` and starts, the planner generates a corresponding physical plan. The planner -~~~~~~~~~~~ +----------- The Ray Data planner translates logical operators to one or more physical operators. For example, the planner translates the ``ReadOp`` logical operator into two physical @@ -44,7 +47,7 @@ logical operator only describes the input data, the ``TaskPoolMapOperator`` phys operator actually launches tasks to read the data. Plan optimization -~~~~~~~~~~~~~~~~~ +----------------- Ray Data applies optimizations to both logical and physical plans. For example, the ``OperatorFusionRule`` combines a chain of physical map operators into a single map @@ -65,7 +68,7 @@ To add custom optimization rules, implement a class that extends ``Rule`` and co ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule) Types of physical operators -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +--------------------------- Physical operators take in a stream of block references and output another stream of block references. Some physical operators launch Ray Tasks and Actors to transform @@ -79,10 +82,10 @@ Non-map operators include ``OutputSplitter`` and ``LimitOperator``. These two op manipulate references to data, but don’t launch tasks or modify the underlying data. Execution ---------- +========= The executor -~~~~~~~~~~~~ +------------ The *executor* schedules tasks and moves data between physical operators. @@ -96,7 +99,7 @@ stored in Ray’s distributed object store. The executor manipulates references objects, and doesn’t fetch the underlying data itself to the executor. Out queues -~~~~~~~~~~ +---------- Each physical operator has an associated *out queue*. When a physical operator produces outputs, the executor moves the outputs to the operator’s out queue. @@ -104,7 +107,7 @@ outputs, the executor moves the outputs to the operator’s out queue. .. _streaming_execution: Streaming execution -~~~~~~~~~~~~~~~~~~~ +------------------- In contrast to bulk synchronous execution, Ray Data’s streaming execution doesn’t wait for one operator to complete to start the next. Each operator takes in and outputs a @@ -112,7 +115,7 @@ stream of blocks. This approach allows you to process datasets that are too larg in your cluster’s memory. The scheduling loop -~~~~~~~~~~~~~~~~~~~ +------------------- The executor runs a loop. Each step works like this: diff --git a/doc/source/data/data.rst b/doc/source/data/data.rst index 61365a090f3d..827de3abeb72 100644 --- a/doc/source/data/data.rst +++ b/doc/source/data/data.rst @@ -13,6 +13,7 @@ Ray Data: Scalable Datasets for ML examples api/api comparisons + performance-tips data-internals Ray Data is a scalable data processing library for ML and AI workloads built on Ray. diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 068b7c4e1570..54ce91277eea 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -21,11 +21,11 @@ The follow diagram provides a high-level view of the Ray framework: :align: center :alt: Ray framework architecture -Ray Data contains operators focused on the following key tasks: +Ray Data contains operators that focus on the following key tasks: - Loading data from storage. - Ingesting data from connected systems. -- Exchanging data from other framework and data structures. +- Exchanging data from other frameworks and data structures. - Transforming and preprocessing data. - Performing offline batch inference. - Persisting results to storage or integrated systems. @@ -44,7 +44,7 @@ How does Ray Data distribute data? Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. -The following figure visualizes a dataset with three blocks, each holding 1000 rows. +The following figure visualizes a Dataset with three blocks, each holding 1000 rows. .. image:: images/dataset-arch-with-blocks.svg :alt: Ray Dataset with three blocks @@ -89,8 +89,8 @@ If you're troubleshooting or optimizing Ray Data workloads, consider the followi * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. * You can configure block size and splitting behaviors. See :ref:`Block size and performance`. -* Ray uses :ref:`Arrow tables` to internally represent blocks of data. - * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See :ref:`Arrow and pandas type differences`. +* Ray uses `Arrow tables `_ to internally represent blocks of data. + * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See `Arrow and pandas type differences `_. * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. @@ -102,7 +102,7 @@ How does Ray Data plan and execute operations? Ray Data uses a two-phase planning process to execute operations efficiently. When you write a program using the Dataset API, Ray Data first builds a *logical plan* - a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations. -This diagram illustrates the complete planning process: +The following diagram illustrates the complete planning process: .. https://docs.google.com/drawings/d/1WrVAg3LwjPo44vjLsn17WLgc3ta2LeQGgRfE8UHrDA0/edit @@ -161,7 +161,7 @@ Ray Data uses a *streaming execution model* to efficiently process large dataset * Each block of data is processed independently at each stage. * Any stage with data present in its input queue is eligible for scheduling. -The streaming execution model +The streaming execution model Because many frameworks supported by Ray Train also support this streaming execution model, Ray can optimize a physical plan for streaming execution from data loading and preprocessing steps all the way through model training. Offline batch inference also uses streaming execution, allowing for efficient model predictions on large datasets with reduced memory and compute requirements. @@ -173,6 +173,7 @@ Because many frameworks supported by Ray Train also support this streaming execu Ray Train provides integrations with many common ML and AI frameworks to efficiently distribute training and support streaming execution for model training. See :ref:`Ray Train`. + The following is a simple code example demonstrate the streaming execution model. , applies a map and filter transformation, and then calls the ``show`` action to trigger the pipeline: .. testcode:: diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 5aa985baae4d..89ff99888e64 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -1,7 +1,12 @@ .. _data_performance_tips: -Advanced: Performance tips and tuning -===================================== +Performance tips and tuning +=========================== + +This page describes considerations and recommendations related to performance and tuning Ray Data workloads. + +* For a conceptual overview of Ray Data, see :ref:`data_key_concepts`. +* For technical implementation details of Ray Data, see :ref:`datasets_scheduling`. .. _block_size: diff --git a/doc/source/data/quickstart.rst b/doc/source/data/quickstart.rst index b0de38303854..10efd0564311 100644 --- a/doc/source/data/quickstart.rst +++ b/doc/source/data/quickstart.rst @@ -12,7 +12,7 @@ This guide introduces you to the core capabilities of Ray Data and includes exam The code examples provided in this guide build upon one another sequentially. You can run these commands interactively in a Jupyter notebook or on Anyscale. -Provided code example materializes results to preview data at each step. When moving code to production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` +Provided code examples materialize results to preview data at each step. When moving code to production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` diff --git a/doc/source/data/user-guide.rst b/doc/source/data/user-guide.rst index 9cf3e9c13321..ac288ec53749 100644 --- a/doc/source/data/user-guide.rst +++ b/doc/source/data/user-guide.rst @@ -26,5 +26,4 @@ show you how achieve several tasks. monitoring-your-workload execution-configurations batch_inference - performance-tips custom-datasource-example From 972d0776cbd89a01567134bff499544cd1a45941 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Sat, 5 Apr 2025 15:13:00 -0400 Subject: [PATCH 08/15] wip --- doc/source/data/key-concepts.rst | 82 +++++++++++++++++++++++++------- doc/source/data/quickstart.rst | 35 +++++++++----- 2 files changed, 88 insertions(+), 29 deletions(-) diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 54ce91277eea..32a14814a9a7 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -10,6 +10,9 @@ To get started working with Ray Data, see :ref:`Ray Data basics For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. + + + How does Ray Data relate to the rest of Ray? ============================================ @@ -23,13 +26,13 @@ The follow diagram provides a high-level view of the Ray framework: Ray Data contains operators that focus on the following key tasks: -- Loading data from storage. -- Ingesting data from connected systems. -- Exchanging data from other frameworks and data structures. -- Transforming and preprocessing data. -- Performing offline batch inference. -- Persisting results to storage or integrated systems. -- Pipelining data for Ray Train. +* Loading data from storage. +* Ingesting data from connected systems. +* Exchanging data from other frameworks and data structures. +* Transforming and preprocessing data. +* Performing offline batch inference. +* Persisting results to storage or integrated systems. +* Pipelining data for Ray Train. Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core primitives. @@ -37,11 +40,29 @@ Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data oper Ray Train is optimized to work on Ray Datasets. See :ref:`Process large datasets as streams`. -How does Ray Data distribute data? ----------------------------------- +What is a Ray Dataset? +====================== + +Ray :class:`~ray.data.Dataset` is the primary + + + +When you write a program using the Dataset API + +Ray Data operations are abrtractions of Ray Core task, actors, and objects. Ray Data is able to optimize how your program resolves over two stages of planning. + +Logical planning +--- + +During the logical planning stage, + +Because Ray Data uses Arrow Tables as the primary unit for storing data in memory, +How does Ray Data distribute data? +================================== + Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. The following figure visualizes a Dataset with three blocks, each holding 1000 rows. @@ -51,11 +72,16 @@ The following figure visualizes a Dataset with three blocks, each holding 1000 r .. https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit +This model is optimized to See :ref:`streaming-execution`. + + + + .. _dataset_conceptual: What is a Ray Dataset? ----------------------- +====================== Use Datasets to interact with data. The following are examples @@ -72,7 +98,7 @@ like :meth:`~ray.data.Dataset.show`. This allows Ray Data to optimize the execut and execute operations in a pipelined streaming fashion. What does a block represent in Ray? ------------------------------------ +=================================== Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data @@ -88,7 +114,7 @@ If you're troubleshooting or optimizing Ray Data workloads, consider the followi * The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. - * You can configure block size and splitting behaviors. See :ref:`Block size and performance`. + * You can configure block size and splitting behaviors. See :ref:`block_size`. * Ray uses `Arrow tables `_ to internally represent blocks of data. * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See `Arrow and pandas type differences `_. * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. @@ -97,10 +123,34 @@ If you're troubleshooting or optimizing Ray Data workloads, consider the followi .. _plans: -How does Ray Data plan and execute operations? ----------------------------------------------- +How does Ray Data plan and run operations? +============================================== + +Ray Data uses a two-phase planning process to execute operations efficiently. + +* You write a program using the Dataset API. + +Ray data builds a *logical plan* that represents all the + +The operators from your program map to *logical operators*. Logical operators + ++-------------------+--------------------------------------------------------------------------+ +| Concept | Definition | ++===================+==========================================================================+ +| Dataset operator | Methods in the Dataset API you use to write your program. | ++-------------------+--------------------------------------------------------------------------+ +| Logical operator | | ++-------------------+--------------------------------------------------------------------------+ +| Logical plan | The collection of logical operators that represents your entire program. | ++-------------------+--------------------------------------------------------------------------+ +| Physical operator | The | ++-------------------+--------------------------------------------------------------------------+ +| Physical plan | The | ++-------------------+--------------------------------------------------------------------------+ + + -Ray Data uses a two-phase planning process to execute operations efficiently. When you write a program using the Dataset API, Ray Data first builds a *logical plan* - a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations. +- a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations. The following diagram illustrates the complete planning process: @@ -150,7 +200,7 @@ For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts `, which represents a distributed collection of data. Datasets are specifically designed for machine learning workloads and can efficiently handle data collections that exceed a single machine's memory. -Ray Datasets are similar to DataFrames and Datasets in TensorFlow, PyTorch, pandas, and Apache Spark. Ray Data provides interoperatibility with these and other libraries. See :ref:`Loading data from other libraries`. +Ray Datasets are similar to DataFrames and Datasets in TensorFlow, PyTorch, pandas, and Apache Spark. Ray Data provides interoperatibility with these and other libraries. See :ref:`load_data_libraries`. + +For details on how Ray Datasets represent data for distributed processing, see :ref:`data_key_concept` + +Most operations in Ray Data fall into one of the following categories: + ++----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Operator type | Description | ++================+==============================================================================================================================================================+ +| Creation | Creates a Dataset from the specified data source, such as data files, an external system, a Python data structure, or another data framework. | ++----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Transformation | Applies specified logic on a Dataset and outputs the results as a new Dataset. | ++----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Consumption | Materializes results from a Dataset to write data as files, convert data to another framework, create an iterator, or return rows or results from a Dataset. | ++----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+ -For details on how Ray Datasets represent data for distributed processing, see :ref:`How does Ray Data work?` .. _loading_key_concept: @@ -36,7 +49,7 @@ Load data You can load data to create datasets from sources including local files, Python objects, and cloud storage services like S3 or GCS. -Ray Data seamlessly integrates with any `filesystem supported by Arrow`__. +Ray Data seamlessly integrates with any `filesystem supported by Arrow`_. The following code example loads CSV data from an S3 bucket and previews the data: @@ -54,7 +67,7 @@ The following code example loads CSV data from an S3 bucket and previews the dat {'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0} -To learn more about creating datasets from different sources, read :ref:`Loading data `. +To learn more about creating datasets from different sources, read :ref:`loading_data`. .. _transforming_key_concept: @@ -63,8 +76,7 @@ Transform data Define data transformations -Apply user-defined functions (UDFs) to transform datasets. Ray automatically parallelizes these transformations -across your cluster for better performance. +Apply user-defined functions (UDFs) to transform datasets. Ray automatically parallelizes these transformations across your cluster for better performance. The following code example applies a UDF to calculate the petal area for the CSV data loaded @@ -110,9 +122,7 @@ To explore more transformation capabilities, read :ref:`Transforming data ` and :ref:`Saving Data `. +For more details on working with Dataset contents, see :ref:`iterating-over-data` and :ref:`saving-data`. .. _saving_key_concept: @@ -138,7 +147,7 @@ Save data --------- Export processed datasets to a variety of formats and storage locations using methods -like :meth:`~ray.data.Dataset.write_parquet`, :meth:`~ray.data.Dataset.write_csv`, and more. +such as :meth:`~ray.data.Dataset.write_parquet`, :meth:`~ray.data.Dataset.write_csv`. .. testcode:: :hide: @@ -163,7 +172,7 @@ like :meth:`~ray.data.Dataset.write_parquet`, :meth:`~ray.data.Dataset.write_csv ['..._000000.parquet', '..._000001.parquet'] -For more information on saving datasets, see :ref:`Saving data `. +For more information on saving datasets, see :ref:`saving-data`. .. _etl_example: From b243d93b532a5d769bb9cdc33108d6a476492cf4 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Sun, 6 Apr 2025 10:15:09 -0400 Subject: [PATCH 09/15] wip --- doc/source/data/data-internals.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 69a49e2d11ea..61ef2a9abbf6 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -15,6 +15,8 @@ The intended audience is advanced users, Ray contributors, and Ray Data develope * For a conceptual overview of Ray Data, see :ref:`How does Ray Data work?`. * For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. +.. _dataset_concept: + How does Ray ============================== From 0b0b3b4dae44696edf230a9cacab10c1108da5a7 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Sun, 6 Apr 2025 15:32:56 -0400 Subject: [PATCH 10/15] wip --- doc/Pipfile | 11 --- doc/source/data/data-internals.rst | 4 +- doc/source/data/data.rst | 1 + doc/source/data/etl-tutorial.rst | 5 + doc/source/data/get-started.rst | 18 ++++ doc/source/data/inference-tutorial.rst | 5 + doc/source/data/key-concepts.rst | 110 +++++++++++++-------- doc/source/data/preprocessing-tutorial.rst | 5 + doc/source/data/quickstart.rst | 65 +++++++++--- 9 files changed, 157 insertions(+), 67 deletions(-) delete mode 100644 doc/Pipfile create mode 100644 doc/source/data/etl-tutorial.rst create mode 100644 doc/source/data/get-started.rst create mode 100644 doc/source/data/inference-tutorial.rst create mode 100644 doc/source/data/preprocessing-tutorial.rst diff --git a/doc/Pipfile b/doc/Pipfile deleted file mode 100644 index d61ea531d92f..000000000000 --- a/doc/Pipfile +++ /dev/null @@ -1,11 +0,0 @@ -[[source]] -url = "https://pypi.org/simple" -verify_ssl = true -name = "pypi" - -[packages] - -[dev-packages] - -[requires] -python_version = "3.13" diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 61ef2a9abbf6..a45a89e80b42 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -11,8 +11,8 @@ This page provides an overview of technical implementation details of Ray Data r The intended audience is advanced users, Ray contributors, and Ray Data developers. -* To get started working with Ray Data, see :ref:`Ray Data basics`. -* For a conceptual overview of Ray Data, see :ref:`How does Ray Data work?`. +* For an overview of Ray Data, see :ref:`data_quickstart`. +* For a conceptual overview of Ray Data, see :ref:`data_key_concepts`. * For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. .. _dataset_concept: diff --git a/doc/source/data/data.rst b/doc/source/data/data.rst index 827de3abeb72..3614122b7231 100644 --- a/doc/source/data/data.rst +++ b/doc/source/data/data.rst @@ -9,6 +9,7 @@ Ray Data: Scalable Datasets for ML quickstart Concepts + get-started user-guide examples api/api diff --git a/doc/source/data/etl-tutorial.rst b/doc/source/data/etl-tutorial.rst new file mode 100644 index 000000000000..fe93aedd0839 --- /dev/null +++ b/doc/source/data/etl-tutorial.rst @@ -0,0 +1,5 @@ +=========================== +Last-mile ETL with Ray Data +=========================== + +Create a tutorial. \ No newline at end of file diff --git a/doc/source/data/get-started.rst b/doc/source/data/get-started.rst new file mode 100644 index 000000000000..1cafdcf09320 --- /dev/null +++ b/doc/source/data/get-started.rst @@ -0,0 +1,18 @@ +.. _data-get-started: + +========================= +Get started with Ray Data +========================= + +The following tutorials provide runnable code examples for common Ray Data applications, including last-mile ETL, preprocessing and data cleaning, and batch inference. + +* For an introduction to Ray Data, see :ref:`data_quickstart`. +* For syntax examples for common use cases, see :ref:`data_user_guide`. +* For end-to-end examples of applications using Ray Data, see :ref:`Ray Data examples`. + +.. toctree:: + :maxdepth: 1 + + Last-mile ETL + Preprocessing and data cleaning + Batch inference \ No newline at end of file diff --git a/doc/source/data/inference-tutorial.rst b/doc/source/data/inference-tutorial.rst new file mode 100644 index 000000000000..cb741673809d --- /dev/null +++ b/doc/source/data/inference-tutorial.rst @@ -0,0 +1,5 @@ +============================= +Batch inference with Ray Data +============================= + +Create a tutorial. \ No newline at end of file diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 32a14814a9a7..c9bbae3a9001 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -1,44 +1,67 @@ .. _data_key_concepts: -======================= -How does Ray Data work? -======================= +============================ +Ray Data conceptual overview +============================ This page provides a conceptual overview of the architecture and execution model for Ray Data. Understanding these technical details can be useful when designing, debugging, and optimizing Ray applications. -To get started working with Ray Data, see :ref:`Ray Data basics`. +This page assumes familiarity with the use cases and core functionality of Ray Data. If you are new to Ray Data, see :ref:`data_quickstart`. + +For specific recommendations on optimizing Ray Data workloads, see :ref:`data_performance_tips`. + +What is Ray Data? +================= + + + + + + + +Ray Data key concepts +===================== + +The following table provides descriptions for the key concepts of Ray Data: + ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Concept | Description | ++===========================+================================================================================================================================================================================================+ +| Dataset | The primary programming interface and data structure in Ray Data. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Streaming execution model | An optimized data processing model that plans and runs transformations as concurrent stages, providing efficient processing for large datasets. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Block | A collection of rows in a dataset that is distributed during planning and processing. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Operator | An abstraction of Ray Core tasks, actors, and objects. Ray Data translates the operators you use to write your program into *logical operators* and then *physical operators* during planning. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Logical plan | The representation of the logic your Ray Data program contains. The logical plan maps user-facing APIs to logical operators. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Physical plan | The final stage of planning in Ray Data, representing how the program runs. Physical operators manipulate references to data and map logic to tasks and actors. | ++---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + + + +How does Ray Data work? +======================= + +Ray Data provides a framework of operators that abstract Ray Core primitives to simplify programming Ray workloads and optimize planning and execution for common ML data processing tasks. -For recommendations on optimizing Ray Data workloads, see :ref:`Performance tips and tuning`. -How does Ray Data relate to the rest of Ray? -============================================ -Ray Data is one of the Ray AI Libraries. Ray AI libraries build on top of Ray Core primitives to provide developer-friendly APIs for completing common data, ML, and AI tasks. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. -The follow diagram provides a high-level view of the Ray framework: -.. image:: ../ray-overview/images/map-of-ray.svg - :align: center - :alt: Ray framework architecture -Ray Data contains operators that focus on the following key tasks: -* Loading data from storage. -* Ingesting data from connected systems. -* Exchanging data from other frameworks and data structures. -* Transforming and preprocessing data. -* Performing offline batch inference. -* Persisting results to storage or integrated systems. -* Pipelining data for Ray Train. Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core primitives. -Ray Train is optimized to work on Ray Datasets. See :ref:`Process large datasets as streams`. +Ray Train is optimized to work on Ray Datasets. See :ref:`streaming-execution`. What is a Ray Dataset? @@ -50,14 +73,16 @@ Ray :class:`~ray.data.Dataset` is the primary When you write a program using the Dataset API -Ray Data operations are abrtractions of Ray Core task, actors, and objects. Ray Data is able to optimize how your program resolves over two stages of planning. +Ray Data operations are abstractions of Ray Core task, actors, and objects. Ray Data is able to optimize how your program resolves over two stages of planning. Logical planning --- During the logical planning stage, -Because Ray Data uses Arrow Tables as the primary unit for storing data in memory, +Phyiscal planning +--- + How does Ray Data distribute data? @@ -223,8 +248,28 @@ Because many frameworks supported by Ray Train also support this streaming execu Ray Train provides integrations with many common ML and AI frameworks to efficiently distribute training and support streaming execution for model training. See :ref:`Ray Train`. +In the streaming execution model, operators are connected in a pipeline, with each operator's output queue feeding directly into the input queue of the next downstream operator. This creates an efficient flow of data through the execution plan. + +The streaming execution model provides significant advantages for data processing. + +In particular, the pipeline architecture enables multiple stages to execute concurrently, improving overall performance and resource utilization. For example, if the map operator requires GPU resources, the streaming execution model can execute the map operator concurrently with the filter operator (which may run on CPUs), effectively utilizing the GPU through the entire duration of the pipeline. + +To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster. + +.. note:: + Operations that need to evaluate, compare, or aggregate the entire dataset create processing bottlenecks for streaming execution. Examples include :meth:`ds.sort() ` and :meth:`ds.groupby() `. + + Ray must materialize the entire dataset to complete these operations, which interupts stream pipeline processing and might lead to significant spill or out-of-memory errors. -The following is a simple code example demonstrate the streaming execution model. , applies a map and filter transformation, and then calls the ``show`` action to trigger the pipeline: + Consider refactoring workloads to remove unnecessary operations that require full dataset materialization. For example, the distributed model used by Ray does not persist ordered results between stages or guarantee that sorting is preserved on write. For many workloads, removing a :meth:`ds.sort() ` operation can eliminate significant overhead without impacting results in any way. + +You can read more about the streaming execution model in this `blog post `__. + + +Streaming execution example +--------------------------- + +The following is a simple code example that demonstrates the streaming execution model. This example loads CSV data, applies a series of map and filter transformations, and then calls the ``show`` action to trigger the pipeline: .. testcode:: @@ -260,20 +305,3 @@ This logical plan maps to the following streaming topology: .. image:: images/streaming-topology.svg :width: 1000 :align: center - -In the streaming execution model, operators are connected in a pipeline, with each operator's output queue feeding directly into the input queue of the next downstream operator. This creates an efficient flow of data through the execution plan. - -The streaming execution model provides significant advantages for data processing. - -In particular, the pipeline architecture enables multiple stages to execute concurrently, improving overall performance and resource utilization. For example, if the map operator requires GPU resources, the streaming execution model can execute the map operator concurrently with the filter operator (which may run on CPUs), effectively utilizing the GPU through the entire duration of the pipeline. - -To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster. - -.. note:: - Operations that need to evaluate, compare, or aggregate the entire dataset create processing bottlenecks for streaming execution. Examples include :meth:`ds.sort() ` and :meth:`ds.groupby() `. - - Ray must materialize the entire dataset to complete these operations, which interupts stream pipeline processing and might lead to significant spill or out-of-memory errors. - - Consider refactoring workloads to remove unnecessary operations that require full dataset materialization. For example, the distributed model used by Ray does not persist ordered results between stages or guarantee that sorting is preserved on write. For many workloads, removing a :meth:`ds.sort() ` operation can eliminate significant overhead without impacting results in any way. - -You can read more about the streaming execution model in this `blog post `__. diff --git a/doc/source/data/preprocessing-tutorial.rst b/doc/source/data/preprocessing-tutorial.rst new file mode 100644 index 000000000000..ee3d54014b1a --- /dev/null +++ b/doc/source/data/preprocessing-tutorial.rst @@ -0,0 +1,5 @@ +============================================= +Preprocessing and data cleaning with Ray Data +============================================= + +Create a tutorial. \ No newline at end of file diff --git a/doc/source/data/quickstart.rst b/doc/source/data/quickstart.rst index cad5ed14fb09..244d1e079662 100644 --- a/doc/source/data/quickstart.rst +++ b/doc/source/data/quickstart.rst @@ -3,21 +3,41 @@ Ray Data basics =============== -This guide introduces you to the core capabilities of Ray Data and includes examples of using the Ray :class:`Dataset ` abstraction for the following tasks: +This page introduces the core capabilities of Ray Data and includes examples of using the Ray :class:`Dataset ` abstraction for the following tasks: * :ref:`Loading data ` * :ref:`Transforming data ` * :ref:`Consuming data ` * :ref:`Saving data ` -The code examples provided in this guide build upon one another sequentially. You can run these commands interactively in a Jupyter notebook or on Anyscale. -Provided code examples materialize results to preview data at each step. When moving code to production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans. See :ref:`How does Ray Data plan and execute operations?` +What is Ray Data? +================= +Ray Data is the Ray AI Library for loading, transforming, consuming, and saving data. + +Ray AI libraries build on top of Ray Core primitives to provide developer-friendly APIs for completing common data, ML, and AI tasks. To learn about Ray Core primitives, see :ref:`Ray Core key concepts`. + +The follow diagram provides a high-level view of the Ray framework: + +.. image:: ../images/map-of-ray.svg + :align: center + :alt: Ray framework architecture -Combine these steps to complete data preprocessing or ETL (extract, transform, and load) workloads. For an example of +Common Ray Data tasks +===================== + +Ray Data contains operators that focus on the following key tasks: + +* Loading data from storage. +* Ingesting data from connected systems. +* Exchanging data from other frameworks and data structures. +* Transforming and preprocessing data. +* Performing offline batch inference. +* Persisting results to storage or integrated systems. +* Pipelining data for Ray Train. What is a Ray Dataset? @@ -27,7 +47,7 @@ Ray Data's main abstraction is a :class:`Dataset `, which repr Ray Datasets are similar to DataFrames and Datasets in TensorFlow, PyTorch, pandas, and Apache Spark. Ray Data provides interoperatibility with these and other libraries. See :ref:`load_data_libraries`. -For details on how Ray Datasets represent data for distributed processing, see :ref:`data_key_concept` +For details on how Ray Datasets represent data for distributed processing, see :ref:`data_key_concepts` Most operations in Ray Data fall into one of the following categories: @@ -41,15 +61,23 @@ Most operations in Ray Data fall into one of the following categories: | Consumption | Materializes results from a Dataset to write data as files, convert data to another framework, create an iterator, or return rows or results from a Dataset. | +----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+ +Run Ray Data code examples +-------------------------- + +The following code examples build upon one another sequentially. You can run these commands interactively in a Jupyter notebook if you have :ref:`configured Ray`, or you can `run Ray on Anyscale `_. + +These code examples materialize data at each step to show results. When preparing code for production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans and efficently pipeline operations for concurrent execution. See :ref:`plans`. + +Combine these steps to complete data preprocessing or ETL (extract, transform, and load) workloads. For an example of .. _loading_key_concept: -Load data ---------- +Load data to create a Dataset +----------------------------- -You can load data to create datasets from sources including local files, Python objects, and cloud storage services like S3 or GCS. +You can load data to create a Dataset from sources including local files, Python objects, and cloud storage services like S3 or GCS. -Ray Data seamlessly integrates with any `filesystem supported by Arrow`_. +Ray Data seamlessly integrates with any `filesystem supported by Arrow `_. The following code example loads CSV data from an S3 bucket and previews the data: @@ -67,18 +95,27 @@ The following code example loads CSV data from an S3 bucket and previews the dat {'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0} -To learn more about creating datasets from different sources, read :ref:`loading_data`. +For an overview of creating Dataset from other sources, including Python data structures and other data processing frameworks, see :ref:`loading_data`. .. _transforming_key_concept: Transform data -------------- -Define data transformations +You define transformations using user-defined functions (UDFs) and -Apply user-defined functions (UDFs) to transform datasets. Ray automatically parallelizes these transformations across your cluster for better performance. + to individual rows or batches of data. -The following code example applies a UDF to calculate the petal area for the CSV data loaded +You write UDFs using Python. The following are examples of UDFs you might use as transformations: +- Arbitrary Python logic. +- ML model predictions. +- NumPy calculations. +- pandas operations. +- PyArrow operations. + +Ray automatically optimizes these transformations for parallel and concurrent execution, allowing you to easily scale to large datasets. + +The following code example applies a UDF to calculate the petal area for the CSV data loaded in the code example above. .. testcode:: @@ -122,6 +159,8 @@ To explore more transformation capabilities, read :ref:`Transforming data Date: Sun, 6 Apr 2025 16:18:34 -0400 Subject: [PATCH 11/15] formatting --- doc/source/data/data-internals.rst | 33 +++++++------- doc/source/data/get-started.rst | 2 +- doc/source/data/key-concepts.rst | 57 +++++++---------------- doc/source/data/quickstart.rst | 72 ++++++------------------------ 4 files changed, 46 insertions(+), 118 deletions(-) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index a45a89e80b42..9bfff955180f 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -1,6 +1,5 @@ .. _datasets_scheduling: -================== Ray Data internals ================== @@ -18,13 +17,13 @@ The intended audience is advanced users, Ray contributors, and Ray Data develope .. _dataset_concept: How does Ray -============================== +------------------------------ Operators ---------- +~~~~~~~~~ There are two types of operators: *logical operators* and *physical operators*. Logical operators are stateless objects that describe “what” to do. Physical operators are @@ -32,7 +31,7 @@ stateful objects that describe “how” to do it. An example of a logical opera ``ReadOp``, and an example of a physical operator is ``TaskPoolMapOperator``. Plans ------ +~~~~~ A *logical plan* is a series of logical operators, and a *physical plan* is a series of physical operators. When you call APIs like :func:`ray.data.read_images` and @@ -40,7 +39,7 @@ physical operators. When you call APIs like :func:`ray.data.read_images` and starts, the planner generates a corresponding physical plan. The planner ------------ +~~~~~~~~~~~ The Ray Data planner translates logical operators to one or more physical operators. For example, the planner translates the ``ReadOp`` logical operator into two physical @@ -49,7 +48,7 @@ logical operator only describes the input data, the ``TaskPoolMapOperator`` phys operator actually launches tasks to read the data. Plan optimization ------------------ +~~~~~~~~~~~~~~~~~ Ray Data applies optimizations to both logical and physical plans. For example, the ``OperatorFusionRule`` combines a chain of physical map operators into a single map @@ -70,7 +69,7 @@ To add custom optimization rules, implement a class that extends ``Rule`` and co ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule) Types of physical operators ---------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~~~ Physical operators take in a stream of block references and output another stream of block references. Some physical operators launch Ray Tasks and Actors to transform @@ -84,10 +83,10 @@ Non-map operators include ``OutputSplitter`` and ``LimitOperator``. These two op manipulate references to data, but don’t launch tasks or modify the underlying data. Execution -========= +--------- The executor ------------- +~~~~~~~~~~~~ The *executor* schedules tasks and moves data between physical operators. @@ -101,7 +100,7 @@ stored in Ray’s distributed object store. The executor manipulates references objects, and doesn’t fetch the underlying data itself to the executor. Out queues ----------- +~~~~~~~~~~ Each physical operator has an associated *out queue*. When a physical operator produces outputs, the executor moves the outputs to the operator’s out queue. @@ -109,7 +108,7 @@ outputs, the executor moves the outputs to the operator’s out queue. .. _streaming_execution: Streaming execution -------------------- +~~~~~~~~~~~~~~~~~~~ In contrast to bulk synchronous execution, Ray Data’s streaming execution doesn’t wait for one operator to complete to start the next. Each operator takes in and outputs a @@ -117,7 +116,7 @@ stream of blocks. This approach allows you to process datasets that are too larg in your cluster’s memory. The scheduling loop -------------------- +~~~~~~~~~~~~~~~~~~~ The executor runs a loop. Each step works like this: @@ -139,7 +138,7 @@ If there are multiple viable operators, the executor chooses the operator with t smallest out queue. Scheduling -========== +---------- Ray Data uses Ray Core for execution. Below is a summary of the :ref:`scheduling strategy ` for Ray Data: @@ -152,7 +151,7 @@ Ray Data uses Ray Core for execution. Below is a summary of the :ref:`scheduling .. _datasets_pg: Ray Data and placement groups ------------------------------ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ By default, Ray Data configures its tasks and actors to use the cluster-default scheduling strategy (``"DEFAULT"``). You can inspect this configuration variable here: :class:`ray.data.DataContext.get_current().scheduling_strategy `. This scheduling strategy schedules these Tasks and Actors outside any present @@ -163,12 +162,12 @@ Consider this override only for advanced use cases to improve performance predic Memory Management -================= +----------------- This section describes how Ray Data manages execution and object store memory. Execution Memory ----------------- +~~~~~~~~~~~~~~~~ During execution, a task can read multiple input blocks, and write multiple output blocks. Input and output blocks consume both worker heap memory and shared memory through Ray's object store. Ray caps object store memory usage by spilling to disk, but excessive worker heap memory usage can cause out-of-memory errors. @@ -176,7 +175,7 @@ Ray caps object store memory usage by spilling to disk, but excessive worker hea For more information on tuning memory usage and preventing out-of-memory errors, see the :ref:`performance guide `. Object Store Memory -------------------- +~~~~~~~~~~~~~~~~~~~ Ray Data uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features: diff --git a/doc/source/data/get-started.rst b/doc/source/data/get-started.rst index 1cafdcf09320..0d726728ed5b 100644 --- a/doc/source/data/get-started.rst +++ b/doc/source/data/get-started.rst @@ -8,7 +8,7 @@ The following tutorials provide runnable code examples for common Ray Data appli * For an introduction to Ray Data, see :ref:`data_quickstart`. * For syntax examples for common use cases, see :ref:`data_user_guide`. -* For end-to-end examples of applications using Ray Data, see :ref:`Ray Data examples`. +* For end-to-end examples of applications using Ray Data, see :doc:`Ray Data examples`. .. toctree:: :maxdepth: 1 diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index c9bbae3a9001..2d21ae9cd6f6 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -1,6 +1,5 @@ .. _data_key_concepts: -============================ Ray Data conceptual overview ============================ @@ -10,17 +9,8 @@ This page assumes familiarity with the use cases and core functionality of Ray D For specific recommendations on optimizing Ray Data workloads, see :ref:`data_performance_tips`. -What is Ray Data? -================= - - - - - - - Ray Data key concepts -===================== +--------------------- The following table provides descriptions for the key concepts of Ray Data: @@ -40,41 +30,18 @@ The following table provides descriptions for the key concepts of Ray Data: | Physical plan | The final stage of planning in Ray Data, representing how the program runs. Physical operators manipulate references to data and map logic to tasks and actors. | +---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ - - How does Ray Data work? -======================= +----------------------- Ray Data provides a framework of operators that abstract Ray Core primitives to simplify programming Ray workloads and optimize planning and execution for common ML data processing tasks. - - - - - - - - - - -Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core primitives. +Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core task, actors, and objects. Ray Train is optimized to work on Ray Datasets. See :ref:`streaming-execution`. -What is a Ray Dataset? -====================== - -Ray :class:`~ray.data.Dataset` is the primary - - - -When you write a program using the Dataset API - -Ray Data operations are abstractions of Ray Core task, actors, and objects. Ray Data is able to optimize how your program resolves over two stages of planning. - Logical planning --- @@ -86,7 +53,7 @@ Phyiscal planning How does Ray Data distribute data? -================================== +---------------------------------- Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. @@ -106,7 +73,7 @@ This model is optimized to See :ref:`streaming-execution`. .. _dataset_conceptual: What is a Ray Dataset? -====================== +---------------------- Use Datasets to interact with data. The following are examples @@ -123,7 +90,7 @@ like :meth:`~ray.data.Dataset.show`. This allows Ray Data to optimize the execut and execute operations in a pipelined streaming fashion. What does a block represent in Ray? -=================================== +----------------------------------- Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data @@ -137,11 +104,17 @@ Blocks have the following characteristics: If you're troubleshooting or optimizing Ray Data workloads, consider the following details and special cases: * The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. + * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. + * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. + * You can configure block size and splitting behaviors. See :ref:`block_size`. + * Ray uses `Arrow tables `_ to internally represent blocks of data. + * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See `Arrow and pandas type differences `_. + * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. @@ -149,7 +122,7 @@ If you're troubleshooting or optimizing Ray Data workloads, consider the followi .. _plans: How does Ray Data plan and run operations? -============================================== +---------------------------------------------- Ray Data uses a two-phase planning process to execute operations efficiently. @@ -225,7 +198,7 @@ For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts ` * :ref:`Saving data ` - - What is Ray Data? -================= +----------------- Ray Data is the Ray AI Library for loading, transforming, consuming, and saving data. @@ -27,7 +25,7 @@ The follow diagram provides a high-level view of the Ray framework: Common Ray Data tasks -===================== +--------------------- Ray Data contains operators that focus on the following key tasks: @@ -102,16 +100,15 @@ For an overview of creating Dataset from other sources, including Python data st Transform data -------------- -You define transformations using user-defined functions (UDFs) and - - to individual rows or batches of data. +You define transformations using user-defined functions (UDFs) and apply them to Datasets using operators including :meth:`ray.data.Dataset.map_batches`, :meth:`ray.data.Dataset.map`, and :meth:`ray.data.Dataset.flat_map`. You write UDFs using Python. The following are examples of UDFs you might use as transformations: -- Arbitrary Python logic. -- ML model predictions. -- NumPy calculations. -- pandas operations. -- PyArrow operations. + +* Arbitrary Python logic. +* ML model predictions. +* NumPy calculations. +* pandas operations. +* PyArrow operations. Ray automatically optimizes these transformations for parallel and concurrent execution, allowing you to easily scale to large datasets. @@ -159,9 +156,9 @@ To explore more transformation capabilities, read :ref:`Transforming data Date: Sun, 6 Apr 2025 17:57:27 -0400 Subject: [PATCH 12/15] wip --- doc/source/data/data-internals.rst | 7 +--- doc/source/data/key-concepts.rst | 52 ++++++++++++++---------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 9bfff955180f..d8b21eaa7aed 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -3,12 +3,7 @@ Ray Data internals ================== -This page provides an overview of technical implementation details of Ray Data related to the following: - -* - - -The intended audience is advanced users, Ray contributors, and Ray Data developers. +This page provides an overview of technical implementation details of Ray Data. The intended audience is advanced users, Ray contributors, and Ray Data developers. * For an overview of Ray Data, see :ref:`data_quickstart`. * For a conceptual overview of Ray Data, see :ref:`data_key_concepts`. diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 2d21ae9cd6f6..07e3921e75b5 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -35,20 +35,36 @@ How does Ray Data work? Ray Data provides a framework of operators that abstract Ray Core primitives to simplify programming Ray workloads and optimize planning and execution for common ML data processing tasks. -Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core task, actors, and objects. +You write programs using Dataset operators + +Ray Data optimizes data processing for ML by planning and executing operations as concurrent stages using parallel processing, referred to as the *streaming execution model*. + +With streaming execution, Ray processes data in a streaming fashion through a pipeline of operations rather than materializing the entire dataset in memory at once. +Ray Data +The following is + +#. The user defines the workload using Datasets. +#. Ray Data translates the workload to a logical plan. +#. During execution, the planner translates the logical plan to a physical plan, optimizing at each step. +#. The physical plan represents each operator as a stage in a pipeline. +#. Each stage has an input queue of blocks of data to process. +#. Each stage writes results as blocks of data in an output queue. +#. Output queues become input queues for the next stage in the pipeline. +#. Each block of data is processed independently at each stage. +#. Any stage with data present in its input queue is eligible for scheduling. Ray Train is optimized to work on Ray Datasets. See :ref:`streaming-execution`. -Logical planning ---- -During the logical planning stage, -Phyiscal planning ---- +Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core task, actors, and objects. + + + + @@ -101,6 +117,8 @@ Blocks have the following characteristics: * Blocks are processed in parallel and sequentially, depending on the operations present in an application. + + If you're troubleshooting or optimizing Ray Data workloads, consider the following details and special cases: * The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. @@ -132,19 +150,6 @@ Ray data builds a *logical plan* that represents all the The operators from your program map to *logical operators*. Logical operators -+-------------------+--------------------------------------------------------------------------+ -| Concept | Definition | -+===================+==========================================================================+ -| Dataset operator | Methods in the Dataset API you use to write your program. | -+-------------------+--------------------------------------------------------------------------+ -| Logical operator | | -+-------------------+--------------------------------------------------------------------------+ -| Logical plan | The collection of logical operators that represents your entire program. | -+-------------------+--------------------------------------------------------------------------+ -| Physical operator | The | -+-------------------+--------------------------------------------------------------------------+ -| Physical plan | The | -+-------------------+--------------------------------------------------------------------------+ @@ -200,14 +205,7 @@ For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts Date: Sun, 6 Apr 2025 21:59:08 -0400 Subject: [PATCH 13/15] mvp --- doc/source/data/data-internals.rst | 28 +- doc/source/data/images/queue.svg | 1 + doc/source/data/images/streaming-topology.svg | 2 +- doc/source/data/key-concepts.rst | 261 +++++------------- doc/source/data/performance-tips.rst | 21 ++ doc/source/data/quickstart.rst | 7 +- 6 files changed, 120 insertions(+), 200 deletions(-) create mode 100644 doc/source/data/images/queue.svg diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index d8b21eaa7aed..5036c68f63a5 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -11,10 +11,32 @@ This page provides an overview of technical implementation details of Ray Data. .. _dataset_concept: -How does Ray ------------------------------- +What does a block represent in Ray? +----------------------------------- +Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data +Blocks have the following characteristics: + +* Each record or row in a Dataset is only present in one block. +* Blocks are distributed across the cluster for independent processing. +* Blocks are processed in parallel and sequentially, depending on the operations present in an application. + +If you're troubleshooting or optimizing Ray Data workloads, consider the following details and special cases: + +* The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. + + * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. + + * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. + + * You can configure block size and splitting behaviors. See :ref:`block_size`. + +* Ray uses `Arrow tables `_ to internally represent blocks of data. + + * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See `Arrow and pandas type differences `_. + + * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. Operators @@ -154,8 +176,6 @@ placement group. To use current placement group resources specifically for Ray D Consider this override only for advanced use cases to improve performance predictability. The general recommendation is to let Ray Data run outside placement groups. - - Memory Management ----------------- diff --git a/doc/source/data/images/queue.svg b/doc/source/data/images/queue.svg new file mode 100644 index 000000000000..ff08d9f664dd --- /dev/null +++ b/doc/source/data/images/queue.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/source/data/images/streaming-topology.svg b/doc/source/data/images/streaming-topology.svg index df99a8c0cf4e..0725a8cd1642 100644 --- a/doc/source/data/images/streaming-topology.svg +++ b/doc/source/data/images/streaming-topology.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/doc/source/data/key-concepts.rst b/doc/source/data/key-concepts.rst index 07e3921e75b5..c3758ecfb0a6 100644 --- a/doc/source/data/key-concepts.rst +++ b/doc/source/data/key-concepts.rst @@ -30,130 +30,61 @@ The following table provides descriptions for the key concepts of Ray Data: | Physical plan | The final stage of planning in Ray Data, representing how the program runs. Physical operators manipulate references to data and map logic to tasks and actors. | +---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +.. _streaming-execution: + How does Ray Data work? ----------------------- -Ray Data provides a framework of operators that abstract Ray Core primitives to simplify programming Ray workloads and optimize planning and execution for common ML data processing tasks. - -You write programs using Dataset operators - -Ray Data optimizes data processing for ML by planning and executing operations as concurrent stages using parallel processing, referred to as the *streaming execution model*. - -With streaming execution, Ray processes data in a streaming fashion through a pipeline of operations rather than materializing the entire dataset in memory at once. - -Ray Data - -The following is - -#. The user defines the workload using Datasets. -#. Ray Data translates the workload to a logical plan. -#. During execution, the planner translates the logical plan to a physical plan, optimizing at each step. -#. The physical plan represents each operator as a stage in a pipeline. -#. Each stage has an input queue of blocks of data to process. -#. Each stage writes results as blocks of data in an output queue. -#. Output queues become input queues for the next stage in the pipeline. -#. Each block of data is processed independently at each stage. -#. Any stage with data present in its input queue is eligible for scheduling. - -Ray Train is optimized to work on Ray Datasets. See :ref:`streaming-execution`. - - - - -Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core task, actors, and objects. - - +Ray Data uses the :class:`~ray.data.Dataset` abstraction to map common data operations to Ray Core tasks, actors, and objects. You write programs using Dataset operators and common Python code and libraries. Ray Data handles optimization, distribution, parallelization, and concurrency for your workloads. +The following is a high-level explanation of how Ray Data plans and executes workloads. +1. Define the workload +~~~~~~~~~~~~~~~~~~~~~~ +You define Ray Data logic using the Dataset API. +The following is a simple example that loads CSV data, applies a series of map and filter transformations, and then calls the ``show`` action to trigger the pipeline: -How does Ray Data distribute data? ----------------------------------- - -Ray Data holds the :class:`~ray.data.Dataset` on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. - -The following figure visualizes a Dataset with three blocks, each holding 1000 rows. - -.. image:: images/dataset-arch-with-blocks.svg - :alt: Ray Dataset with three blocks -.. - https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit - -This model is optimized to See :ref:`streaming-execution`. - - - - - -.. _dataset_conceptual: - -What is a Ray Dataset? ----------------------- - -Use Datasets to interact with data. The following are examples - -* - -`Dataset` is the main user-facing Python API. It represents a distributed data collection and define data loading and processing operations. Users typically use the API by: - -1. Create a :class:`Dataset ` from external storage or in-memory data. -2. Apply transformations to the data. -3. Write the outputs to external storage or feed the outputs to training workers. - -The Dataset API is lazy, meaning that operations aren't executed until you materialize or consume the dataset, -like :meth:`~ray.data.Dataset.show`. This allows Ray Data to optimize the execution plan -and execute operations in a pipelined streaming fashion. - -What does a block represent in Ray? ------------------------------------ - -Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data - -Blocks have the following characteristics: - -* Each record or row in a Dataset is only present in one block. -* Blocks are distributed across the cluster for independent processing. -* Blocks are processed in parallel and sequentially, depending on the operations present in an application. - - - - -If you're troubleshooting or optimizing Ray Data workloads, consider the following details and special cases: +.. testcode:: -* The number of row or records in a block varies base on the size of each record. Most blocks are between 1 MiB and 128 MiB. - - * Ray automatically splits blocks into smaller blocks if they exceed the max block size by 50% or more. - - * A block might only contain a single record if your data is very wide or contains a large record such as an image, vector, or tensor. Ray Data has built-in optimizations for handling large data efficiently, and you should test workloads with built-in defaults before trying to manually optimize your workload. - - * You can configure block size and splitting behaviors. See :ref:`block_size`. + import ray -* Ray uses `Arrow tables `_ to internally represent blocks of data. - - * Ray Data falls back to pandas DataFrames for data that cannot be safely represented using Arrow tables. See `Arrow and pandas type differences `_. - - * Block format doesn't affect the of data type returned by APIs such as :meth:`~ray.data.Dataset.iter_batches`. + # Create a dataset with 1K rows + ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") + # Define a pipeline of operations + ds = ds.map(lambda x: {"target1": x["target"] * 2}) + ds = ds.map(lambda x: {"target2": x["target1"] * 2}) + ds = ds.map(lambda x: {"target3": x["target2"] * 2}) + ds = ds.filter(lambda x: x["target3"] % 4 == 0) + # Data starts flowing when you call a method like show() + ds.show(5) .. _plans: -How does Ray Data plan and run operations? ----------------------------------------------- +2. Translate the logical plan +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Ray Data uses a two-phase planning process to execute operations efficiently. +Ray Data translates the workload to a logical plan. The logical plan consists of *logical operators* that describe *what* operation to perform. -* You write a program using the Dataset API. - -Ray data builds a *logical plan* that represents all the +The following is a simplified view of the resultant logical plan: -The operators from your program map to *logical operators*. Logical operators +.. code-block:: + Filter() + +- Map() + +- Map() + +- Map() + +- Dataset(schema={...}) +3. Build the physical plan +~~~~~~~~~~~~~~~~~~~~~~~~~~ +When you run your program, the planner translates the logical plan into a physical plan. The physical plans consists of *physical operators* that describe *how* to execute the operation. -- a high-level description of what operations to perform. When execution begins, it converts this into a *physical plan* that specifies exactly how to execute those operations. +Because Ray Data uses a lazy execution model, the planner can optimize the logical and physical plans to reduce data movement and serialization costs. The following diagram illustrates the complete planning process: @@ -163,116 +94,64 @@ The following diagram illustrates the complete planning process: :width: 600 :align: center -The building blocks of these plans are operators: - -* Logical plans consist of *logical operators* that describe *what* operation to perform. For example, ``ReadOp`` specifies what data to read. -* Physical plans consist of *physical operators* that describe *how* to execute the operation. For example, ``TaskPoolMapOperator`` launches Ray tasks to actually read the data. - -Here is a simple example of how Ray Data builds a logical plan. As you chain operations together, Ray Data constructs the logical plan behind the scenes: - -.. testcode:: - import ray - - dataset = ray.data.range(100) - dataset = dataset.add_column("test", lambda x: x["id"] + 1) - dataset = dataset.select_columns("test") - -You can inspect the resulting logical plan by printing the dataset: - -.. code-block:: - - Project - +- MapBatches(add_column) - +- Dataset(schema={...}) - -When execution begins, Ray Data optimizes the logical plan, then translate it into a physical plan - a series of operators that implement the actual data transformations. During this translation: - -1. A single logical operator may become multiple physical operators. For example, ``ReadOp`` becomes both ``InputDataBuffer`` and ``TaskPoolMapOperator``. -2. Both logical and physical plans go through optimization passes. For example, ``OperatorFusionRule`` combines map operators to reduce serialization overhead. +Ray Data optimizes data processing for ML using a *streaming execution model*. With streaming execution, Ray processes data in a streaming fashion through a pipeline of operations rather than materializing the entire dataset in memory at once. -Physical operators work by: +Operators are connected as stages in a pipeline. In each stage, the operator outputs results as a queue. The output queue for one operator is the input queue of the next downstream operator. This creates an efficient flow of data through the execution plan. -* Taking in a stream of block references -* Performing their operation (either transforming data with Ray Tasks/Actors or manipulating references) -* Outputting another stream of block references +The physical plan represents each operator as a stage in a pipeline. -For more details on Ray Tasks and Actors, see :ref:`Ray Core Concepts `. +The following diagram is a simplified illustration of the physical plan: -.. note:: A dataset's execution plan only runs when you materialize or consume the dataset through operations like :meth:`~ray.data.Dataset.show`. - -.. _streaming-execution: - -Process large datasets as streams ---------------------------------- - -Ray Data uses a *streaming execution model* to efficiently process large datasets. - -The streaming execution model - - -Because many frameworks supported by Ray Train also support this streaming execution model, Ray can optimize a physical plan for streaming execution from data loading and preprocessing steps all the way through model training. Offline batch inference also uses streaming execution, allowing for efficient model predictions on large datasets with reduced memory and compute requirements. - -.. note:: - - Models, frameworks, or algorithms that must materialize the entire dataset to calculate results are not optimized for streaming execution. - - Ray Train provides integrations with many common ML and AI frameworks to efficiently distribute training and support streaming execution for model training. See :ref:`Ray Train`. - - -In the streaming execution model, operators are connected in a pipeline, with each operator's output queue feeding directly into the input queue of the next downstream operator. This creates an efficient flow of data through the execution plan. +.. https://docs.google.com/drawings/d/10myFIVtpI_ZNdvTSxsaHlOhA_gHRdUde_aHRC9zlfOw/edit -The streaming execution model provides significant advantages for data processing. +.. image:: images/streaming-topology.svg + :width: 600 + :align: center -In particular, the pipeline architecture enables multiple stages to execute concurrently, improving overall performance and resource utilization. For example, if the map operator requires GPU resources, the streaming execution model can execute the map operator concurrently with the filter operator (which may run on CPUs), effectively utilizing the GPU through the entire duration of the pipeline. -To summarize, Ray Data's streaming execution model can efficiently process datasets that are much larger than available memory while maintaining high performance through parallel execution across the cluster. +4. Distribute data +~~~~~~~~~~~~~~~~~~ -.. note:: - Operations that need to evaluate, compare, or aggregate the entire dataset create processing bottlenecks for streaming execution. Examples include :meth:`ds.sort() ` and :meth:`ds.groupby() `. - - Ray must materialize the entire dataset to complete these operations, which interupts stream pipeline processing and might lead to significant spill or out-of-memory errors. +The physical plan contains the instructions for distributing blocks of data to workers for parallelization. - Consider refactoring workloads to remove unnecessary operations that require full dataset materialization. For example, the distributed model used by Ray does not persist ordered results between stages or guarantee that sorting is preserved on write. For many workloads, removing a :meth:`ds.sort() ` operation can eliminate significant overhead without impacting results in any way. - -You can read more about the streaming execution model in this `blog post `__. +Ray Data holds the Dataset on the process that triggers execution (which is usually the entrypoint of the program, referred to as the :term:`driver`) and stores the blocks as objects in Ray's shared-memory :ref:`object store `. +The following figure visualizes a Dataset with three blocks, each holding 1000 rows: -Streaming execution example -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. image:: images/dataset-arch-with-blocks.svg + :alt: Ray Dataset with three blocks +.. + https://docs.google.com/drawings/d/1kOYQqHdMrBp2XorDIn0u0G_MvFj-uSA4qm6xf9tsFLM/edit -The following is a simple code example that demonstrates the streaming execution model. This example loads CSV data, applies a series of map and filter transformations, and then calls the ``show`` action to trigger the pipeline: -.. testcode:: +5. Transform the data +~~~~~~~~~~~~~~~~~~~~~ - import ray +Each stage in the physical plan processes blocks of data using the following behavior: - # Create a dataset with 1K rows - ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv") +* The stage receives an input queue of block references. +* The operator transforms the block of data. +* The result of the transformation is a block reference in an output queue. +* Output queues become input queues for the next stage in the pipeline. - # Define a pipeline of operations - ds = ds.map(lambda x: {"target1": x["target"] * 2}) - ds = ds.map(lambda x: {"target2": x["target1"] * 2}) - ds = ds.map(lambda x: {"target3": x["target2"] * 2}) - ds = ds.filter(lambda x: x["target3"] % 4 == 0) +The following diagram illustrates how data flows through a queue between stages: - # Data starts flowing when you call a method like show() - ds.show(5) +.. image:: images/queue.svg + :width: 600 + :align: center -The following is a simplified view of the resultant logical plan: +6. Concurrent scheduling +~~~~~~~~~~~~~~~~~~~~~~~~ -.. code-block:: +Each block of data is processed independently at each stage. Any stage with data present in its input queue is eligible for scheduling. - Filter() - +- Map() - +- Map() - +- Map() - +- Dataset(schema={...}) +The pipeline architecture enables multiple stages to execute concurrently, improving overall performance and resource utilization. For example, if the map operator requires GPU resources, the streaming execution model can execute the map operator concurrently with a filter operator running on CPU resources, effectively utilizing the GPU through the entire duration of the pipeline. +Ray Data and Ray Train +---------------------- -This logical plan maps to the following streaming topology: +The streaming execution model is particularly well-suited to workloads that use GPUs to process large datasets. When you use Ray Data to load and preprocess data for Ray Train, Ray can optimize streaming execution from your data source through training. See :ref:`Ray Train`. -.. https://docs.google.com/drawings/d/10myFIVtpI_ZNdvTSxsaHlOhA_gHRdUde_aHRC9zlfOw/edit +Offline batch inference also uses streaming execution, allowing for efficient model predictions on large datasets with reduced memory and compute requirements. -.. image:: images/streaming-topology.svg - :width: 1000 - :align: center +.. note:: Models, frameworks, or algorithms that must materialize the entire dataset to calculate results are not optimized for streaming execution. See :ref:`remove_bottlenecks`. diff --git a/doc/source/data/performance-tips.rst b/doc/source/data/performance-tips.rst index 89ff99888e64..2fb5bea43873 100644 --- a/doc/source/data/performance-tips.rst +++ b/doc/source/data/performance-tips.rst @@ -8,6 +8,27 @@ This page describes considerations and recommendations related to performance an * For a conceptual overview of Ray Data, see :ref:`data_key_concepts`. * For technical implementation details of Ray Data, see :ref:`datasets_scheduling`. +.. _remove_materialize: + +Remove unnecessary materialization +---------------------------------- + +Operations that consume data force your logic to materialize. For example, while operations to inspect your data might be informative while developing a new workload, these same operations can introduce bottlenecks when deploying your code to production. + +Some operations, such as :meth:`~ray.data.Dataset.take_all`, can also lead to out-of-memory errors when you scale the size of your data. + +.. _remove_bottlenecks: + +Remove logical bottlenecks +-------------------------- + +Operations that need to evaluate, compare, or aggregate the entire dataset create processing bottlenecks for streaming execution. Examples include :meth:`ds.sort() ` and :meth:`ds.groupby() `. + +Ray must materialize the entire dataset to complete these operations, which interupts stream pipeline processing and might lead to significant spill or out-of-memory errors. + +Consider refactoring workloads to remove unnecessary operations that require full dataset materialization. For example, the distributed model used by Ray does not persist ordered results between stages or guarantee that sorting is preserved on write. For many workloads, removing a :meth:`ds.sort() ` operation can eliminate significant overhead without impacting results in any way. + + .. _block_size: Block size and performance diff --git a/doc/source/data/quickstart.rst b/doc/source/data/quickstart.rst index a307b4f9b0ec..c9ca782d621b 100644 --- a/doc/source/data/quickstart.rst +++ b/doc/source/data/quickstart.rst @@ -45,7 +45,7 @@ Ray Data's main abstraction is a :class:`Dataset `, which repr Ray Datasets are similar to DataFrames and Datasets in TensorFlow, PyTorch, pandas, and Apache Spark. Ray Data provides interoperatibility with these and other libraries. See :ref:`load_data_libraries`. -For details on how Ray Datasets represent data for distributed processing, see :ref:`data_key_concepts` +For details on how Ray Datasets represent and optimize data for distributed processing, see :ref:`data_key_concepts` Most operations in Ray Data fall into one of the following categories: @@ -64,9 +64,8 @@ Run Ray Data code examples The following code examples build upon one another sequentially. You can run these commands interactively in a Jupyter notebook if you have :ref:`configured Ray`, or you can `run Ray on Anyscale `_. -These code examples materialize data at each step to show results. When preparing code for production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans and efficently pipeline operations for concurrent execution. See :ref:`plans`. - -Combine these steps to complete data preprocessing or ETL (extract, transform, and load) workloads. For an example of +.. note:: + These code examples materialize data at each step to show results. When preparing code for production, only include operations that materialize results if they are essential to your application. This allows Ray to optimize the logical and physical plans and efficently pipeline operations for concurrent execution. See :ref:`remove_materialize`. .. _loading_key_concept: From 63ec59612ea0749fe5c10b757536982b946914d2 Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Sun, 6 Apr 2025 22:05:46 -0400 Subject: [PATCH 14/15] formatting typo --- doc/source/data/data-internals.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/data/data-internals.rst b/doc/source/data/data-internals.rst index 5036c68f63a5..825a9fd88931 100644 --- a/doc/source/data/data-internals.rst +++ b/doc/source/data/data-internals.rst @@ -14,7 +14,7 @@ This page provides an overview of technical implementation details of Ray Data. What does a block represent in Ray? ----------------------------------- -Ray Data uses _blocks_ to represent subsets of data in a Dataset. Most users of Ray Data +Ray Data uses *blocks* to represent subsets of data in a Dataset. Most users of Ray Data Blocks have the following characteristics: From c958c67ed1b39653154fde78992d42acca61ca9b Mon Sep 17 00:00:00 2001 From: dstrodtman Date: Mon, 7 Apr 2025 08:27:35 -0400 Subject: [PATCH 15/15] typo --- doc/source/data/quickstart.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/data/quickstart.rst b/doc/source/data/quickstart.rst index c9ca782d621b..d7fd4a665a3c 100644 --- a/doc/source/data/quickstart.rst +++ b/doc/source/data/quickstart.rst @@ -92,7 +92,7 @@ The following code example loads CSV data from an S3 bucket and previews the dat {'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0} -For an overview of creating Dataset from other sources, including Python data structures and other data processing frameworks, see :ref:`loading_data`. +For an overview of creating Datasets from other sources, including Python data structures and other data processing frameworks, see :ref:`loading_data`. .. _transforming_key_concept: