Description
What happened + What you expected to happen
When using a BatchPredictor
initialized from an existing checkpoint, sometimes the .predict()
call fails with a cloudpickle.UnpicklingError
. There's no obvious cause for when/why the task fails. The specific error message — "pickle data was truncated" — is potentially misleading, since the checkpoint artifacts' serialized data is fine: I can successfully load them manually via checkpoint methods .get_estimator()
and .get_preprocessor()
. The problem seems to be in the batch predictor itself.
Here's a representative code blob:
dataset = ray.data.read_parquet("[LOCAL_DATA_DIR]")
checkpoint = ray.train.sklearn.SklearnCheckpoint(local_path="[LOCAL_PATH]")
batch_predictor = BatchPredictor.from_checkpoint(
checkpoint, ray.train.sklearn.SklearnPredictor
)
preds = batch_predictor.predict(dataset)
And here's the logging output:
2023-03-28 14:35:44,229 INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[read->Featurizer]
read->Featurizer: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 13/13 [00:54<00:00, 4.18s/it]
2023-03-28 14:36:38,623 INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(ScoringWrapper)]
MapBatches(ScoringWrapper), 3 actors: 62%|█████████████████████████████████████████████████████████████████████████████████████████▊ | 8/13 [00:18<00:06, 1.27s/it]
---------------------------------------------------------------------------
RayTaskError(UnpicklingError) Traceback (most recent call last)
Cell In[12], line 4
1 batch_predictor = BatchPredictor.from_checkpoint(
2 checkpoint, ray.train.sklearn.SklearnPredictor
3 )
----> 4 preds = batch_predictor.predict(dataset)
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/batch_predictor.py:334, in BatchPredictor.predict(self, data, feature_columns, keep_columns, batch_size, min_scoring_workers, max_scoring_workers, num_cpus_per_worker, num_gpus_per_worker, separate_gpu_stage, ray_remote_args, **predict_kwargs)
320 prediction_results = data.map_batches(
321 ScoringWrapper,
322 compute=compute,
(...)
329 **ray_remote_args,
330 )
332 if isinstance(prediction_results, ray.data.Dataset):
333 # Force execution because Dataset uses lazy execution by default.
--> 334 prediction_results.fully_executed()
336 return prediction_results
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/dataset.py:3966, in Dataset.fully_executed(self)
3957 def fully_executed(self) -> "Dataset[T]":
3958 """Force full evaluation of the blocks of this dataset.
3959
3960 This can be used to read all blocks into memory. By default, Datasets
(...)
3964 A Dataset with all blocks fully materialized in memory.
3965 """
-> 3966 self._plan.execute(force_read=True)
3967 return self
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/plan.py:539, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read)
534 from ray.data._internal.execution.legacy_compat import (
535 execute_to_legacy_block_list,
536 )
538 executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 539 blocks = execute_to_legacy_block_list(
540 executor,
541 self,
542 allow_clear_input_blocks=allow_clear_input_blocks,
543 dataset_uuid=self._dataset_uuid,
544 )
545 # TODO(ekl) we shouldn't need to set this in the future once we move
546 # to a fully lazy execution model, unless .cache() is used. The reason
547 # we need it right now is since the user may iterate over a Dataset
548 # multiple times after fully executing it once.
549 if not self._run_by_consumer:
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py:84, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid)
82 else:
83 dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
---> 84 bundles = executor.execute(dag, initial_stats=stats)
85 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)
86 return _bundles_to_block_list(bundles)
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:82, in BulkExecutor.execute(self, dag, initial_stats)
77 logger.get_logger(log_to_stdout=context.enable_auto_log_stats).info(
78 stats_summary_string,
79 )
80 return output
---> 82 return execute_recursive(dag)
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:63, in BulkExecutor.execute.<locals>.execute_recursive(op)
61 op.add_input(r, input_index=i)
62 op.inputs_done()
---> 63 output = _naive_run_until_complete(op)
64 finally:
65 op.shutdown()
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:106, in _naive_run_until_complete(op)
102 done, _ = ray.wait(
103 tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
104 )
105 for ready in done:
--> 106 op.notify_work_completed(ready)
107 tasks = op.get_work_refs()
108 while op.has_next():
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py:160, in ActorPoolMapOperator.notify_work_completed(self, ref)
157 if ref in self._tasks:
158 # Get task state and set output.
159 task, actor = self._tasks.pop(ref)
--> 160 task.output = self._map_ref_to_ref_bundle(ref)
161 self._handle_task_done(task)
162 # Return the actor that was running the task to the pool.
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py:296, in MapOperator._map_ref_to_ref_bundle(***failed resolving arguments***)
294 del ref
295 block_refs = all_refs[:-1]
--> 296 block_metas = ray.get(all_refs[-1])
297 assert len(block_metas) == len(block_refs), (block_refs, block_metas)
298 for ref in block_refs:
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/_private/worker.py:2380, in get(object_refs, timeout)
2378 worker.core_worker.dump_object_store_memory_usage()
2379 if isinstance(value, RayTaskError):
-> 2380 raise value.as_instanceof_cause()
2381 else:
2382 raise value
RayTaskError(UnpicklingError): ray::_MapWorker.submit() (pid=47207, ip=127.0.0.1)
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 272, in submit
yield from _map_task(fn, ctx, *blocks)
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 351, in _map_task
for b_out in fn(iter(blocks), ctx):
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 219, in do_map
yield from block_fn(blocks, ctx, *fn_args, **fn_kwargs)
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 102, in fn
yield from process_next_batch(batch)
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 66, in process_next_batch
batch = batch_fn(batch, *fn_args, **fn_kwargs)
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 199, in fn
ray.data._cached_fn = fn_(
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/batch_predictor.py", line 210, in __init__
self._predictor = predictor_cls.from_checkpoint(
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/sklearn/sklearn_predictor.py", line 58, in from_checkpoint
estimator = checkpoint.get_estimator()
File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/sklearn/sklearn_checkpoint.py", line 72, in get_estimator
return cpickle.load(f)
_pickle.UnpicklingError: pickle data was truncated
Meanwhile, both checkpoint.get_estimator()
and checkpoint.get_preprocessor()
return the objects as expected.
Versions / Dependencies
ray == 2.3
, pandas == 1.5
, pyarrow == 10.0
PY 3.9.13
macOS 13.2
Reproduction script
Unfortunately this is not consistently reproducible.
Issue Severity
Medium: It is a significant difficulty but I can work around it.