8000 [Train] Intermittent `UnpicklingError` when loading estimator/preprocessor from checkpoint · Issue #33815 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
[Train] Intermittent UnpicklingError when loading estimator/preprocessor from checkpoint #33815
Open
@bdewilde

Description

@bdewilde

What happened + What you expected to happen

When using a BatchPredictor initialized from an existing checkpoint, sometimes the .predict() call fails with a cloudpickle.UnpicklingError. There's no obvious cause for when/why the task fails. The specific error message — "pickle data was truncated" — is potentially misleading, since the checkpoint artifacts' serialized data is fine: I can successfully load them manually via checkpoint methods .get_estimator() and .get_preprocessor(). The problem seems to be in the batch predictor itself.

Here's a representative code blob:

dataset = ray.data.read_parquet("[LOCAL_DATA_DIR]")
checkpoint = ray.train.sklearn.SklearnCheckpoint(local_path="[LOCAL_PATH]")
batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint, ray.train.sklearn.SklearnPredictor
)
preds = batch_predictor.predict(dataset)

And here's the logging output:

2023-03-28 14:35:44,229	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[read->Featurizer]
read->Featurizer: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 13/13 [00:54<00:00,  4.18s/it]
2023-03-28 14:36:38,623	INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(ScoringWrapper)]
MapBatches(ScoringWrapper), 3 actors:  62%|█████████████████████████████████████████████████████████████████████████████████████████▊                                                        | 8/13 [00:18<00:06,  1.27s/it]
---------------------------------------------------------------------------
RayTaskError(UnpicklingError)             Traceback (most recent call last)
Cell In[12], line 4
      1 batch_predictor = BatchPredictor.from_checkpoint(
      2     checkpoint, ray.train.sklearn.SklearnPredictor
      3 )
----> 4 preds = batch_predictor.predict(dataset)

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/batch_predictor.py:334, in BatchPredictor.predict(self, data, feature_columns, keep_columns, batch_size, min_scoring_workers, max_scoring_workers, num_cpus_per_worker, num_gpus_per_worker, separate_gpu_stage, ray_remote_args, **predict_kwargs)
    320 prediction_results = data.map_batches(
    321     ScoringWrapper,
    322     compute=compute,
   (...)
    329     **ray_remote_args,
    330 )
    332 if isinstance(prediction_results, ray.data.Dataset):
    333     # Force execution because Dataset uses lazy execution by default.
--> 334     prediction_results.fully_executed()
    336 return prediction_results

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/dataset.py:3966, in Dataset.fully_executed(self)
   3957 def fully_executed(self) -> "Dataset[T]":
   3958     """Force full evaluation of the blocks of this dataset.
   3959 
   3960     This can be used to read all blocks into memory. By default, Datasets
   (...)
   3964         A Dataset with all blocks fully materialized in memory.
   3965     """
-> 3966     self._plan.execute(force_read=True)
   3967     return self

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/plan.py:539, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read)
    534 from ray.data._internal.execution.legacy_compat import (
    535     execute_to_legacy_block_list,
    536 )
    538 executor = BulkExecutor(copy.deepcopy(context.execution_options))
--> 539 blocks = execute_to_legacy_block_list(
    540     executor,
    541     self,
    542     allow_clear_input_blocks=allow_clear_input_blocks,
    543     dataset_uuid=self._dataset_uuid,
    544 )
    545 # TODO(ekl) we shouldn't need to set this in the future once we move
    546 # to a fully lazy execution model, unless .cache() is used. The reason
    547 # we need it right now is since the user may iterate over a Dataset
    548 # multiple times after fully executing it once.
    549 if not self._run_by_consumer:

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py:84, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid)
     82 else:
     83     dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
---> 84 bundles = executor.execute(dag, initial_stats=stats)
     85 _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)
     86 return _bundles_to_block_list(bundles)

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:82, in BulkExecutor.execute(self, dag, initial_stats)
     77     logger.get_logger(log_to_stdout=context.enable_auto_log_stats).info(
     78         stats_summary_string,
     79     )
     80     return output
---> 82 return execute_recursive(dag)

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:63, in BulkExecutor.execute.<locals>.execute_recursive(op)
     61             op.add_input(r, input_index=i)
     62     op.inputs_done()
---> 63     output = _naive_run_until_complete(op)
     64 finally:
     65     op.shutdown()

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/bulk_executor.py:106, in _naive_run_until_complete(op)
    102 done, _ = ray.wait(
    103     tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
    104 )
    105 for ready in done:
--> 106     op.notify_work_completed(ready)
    107 tasks = op.get_work_refs()
    108 while op.has_next():

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py:160, in ActorPoolMapOperator.notify_work_completed(self, ref)
    157 if ref in self._tasks:
    158     # Get task state and set output.
    159     task, actor = self._tasks.pop(ref)
--> 160     task.output = self._map_ref_to_ref_bundle(ref)
    161     self._handle_task_done(task)
    162     # Return the actor that was running the task to the pool.

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py:296, in MapOperator._map_ref_to_ref_bundle(***failed resolving arguments***)
    294 del ref
    295 block_refs = all_refs[:-1]
--> 296 block_metas = ray.get(all_refs[-1])
    297 assert len(block_metas) == len(block_refs), (block_refs, block_metas)
    298 for ref in block_refs:

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    103     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104         return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)

File ~/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/_private/worker.py:2380, in get(object_refs, timeout)
   2378     worker.core_worker.dump_object_store_memory_usage()
   2379 if isinstance(value, RayTaskError):
-> 2380     raise value.as_instanceof_cause()
   2381 else:
   2382     raise value

RayTaskError(UnpicklingError): ray::_MapWorker.submit() (pid=47207, ip=127.0.0.1)
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 272, in submit
    yield from _map_task(fn, ctx, *blocks)
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 351, in _map_task
    for b_out in fn(iter(blocks), ctx):
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 219, in do_map
    yield from block_fn(blocks, ctx, *fn_args, **fn_kwargs)
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 102, in fn
    yield from process_next_batch(batch)
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/planner/map_batches.py", line 66, in process_next_batch
    batch = batch_fn(batch, *fn_args, **fn_kwargs)
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/data/_internal/execution/legacy_compat.py", line 199, in fn
    ray.data._cached_fn = fn_(
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/batch_predictor.py", line 210, in __init__
    self._predictor = predictor_cls.from_checkpoint(
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/sklearn/sklearn_predictor.py", line 58, in from_checkpoint
    estimator = checkpoint.get_estimator()
  File "/Users/burtondewilde/.pyenv/versions/3.9.13/envs/my-env/lib/python3.9/site-packages/ray/train/sklearn/sklearn_checkpoint.py", line 72, in get_estimator
    return cpickle.load(f)
_pickle.UnpicklingError: pickle data was truncated

Meanwhile, both checkpoint.get_estimator() and checkpoint.get_preprocessor() return the objects as expected.

Versions / Dependencies

ray == 2.3, pandas == 1.5, pyarrow == 10.0
PY 3.9.13
macOS 13.2

Reproduction script

Unfortunately this is not consistently reproducible.

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

Labels

P2Important issue, but not time-criticalbugSomething that is supposed to be working; but isn'tpending-cleanupThis issue is pending cleanup. It will be removed in 2 weeks after being assigned.trainRay Train Related Issue

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0