8000 feat: add pass-in value of edit fields for petastorm datamodule in py… by serena-ruan · Pull Request #3651 · horovod/horovod · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
8000

feat: add pass-in value of edit fields for petastorm datamodule in py… #3651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- TensorFlow: Added doc string for `hvd.grouped_allreduce()`. ([#3594](https://github.com/horovod/horovod/pull/3594))
- Added warning messages if output tensor memory allocations fail. ([#3594](https://github.com/horovod/horovod/pull/3594))
- Added `register_local_source` and `use_generic_names` funtionality to DistributedGradientTape. ([#3628](https://github.com/horovod/horovod/pull/3628))
- Added `transformation_edit_fields` and `transformation_removed_fields` param for EstimatorParams. ([#3651](https://github.com/horovod/horovod/pull/3651))
- Added `PartialDistributedGradientTape()` API for model parallel use cases. ([#3643](https://github.com/horovod/horovod/pull/3643))

### Changed

### Deprecated
Expand Down
23 changes: 23 additions & 0 deletions horovod/spark/common/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class EstimatorParams(Params):
'functions that construct the transformation '
'function that applies custom transformations to '
'every batch before train and validation steps')

transformation_edit_fields = Param(Params._dummy(), 'transformation_edit_fields',
'edit fields for petastorm TransformSpec that\'s applied to '
'every batch, A list of 4-tuples with the following fields: '
'(name, numpy_dtype, shape, is_nullable)')

transformation_removed_fields = Param(Params._dummy(), 'transformation_removed_fields',
'removed fields for petastorm TransformSpec that\'s applied to '
'every batch, A list of field names that will be removed from the original schema.')

label_shapes = Param(Params._dummy(), 'label_shapes', 'specifies the shape (or shapes) of the label column (or columns)')

Expand Down Expand Up @@ -145,6 +154,8 @@ def __init__(self):
train_steps_per_epoch=None,
validation_steps_per_epoch=None,
transformation_fn=None,
transformation_edit_fields=None,
transformation_removed_fields=None,
train_reader_num_workers=2,
val_reader_num_workers=2,
reader_pool_type='process',
Expand Down Expand Up @@ -331,6 +342,18 @@ def setTransformationFn(self, value):

def getTransformationFn(self):
return self.getOrDefault(self.transformation_fn)

def setTransformationEditFields(self, value):
return self._set(transformation_edit_fields=value)

def getTransformationEditFields(self):
return self.getOrDefault(self.transformation_edit_fields)

def setTransformationRemovedFields(self, value):
return self._set(transformation_removed_fields=value)

def getTransformationRemovedFields(self):
return self.getOrDefault(self.transformation_removed_fields)

def setTrainReaderNumWorker(self, value):
return self._set(train_reader_num_workers=value)
Expand Down
102 changes: 72 additions & 30 deletions horovod/spark/lightning/datamodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,37 @@

PETASTORM_HDFS_DRIVER = constants.PETASTORM_HDFS_DRIVER


class PetastormDataModule(pl.LightningDataModule):
"""Default DataModule for Lightning Estimator"""
def __init__(self, train_dir: str, val_dir: str, num_train_epochs: int=1, has_val: bool=True,
train_batch_size: int=32, val_batch_size: int=32, shuffle_size: int=1000,
num_reader_epochs=None, reader_pool_type: str="process",
reader_worker_count: int=2, transformation=None, inmemory_cache_all=False,
cur_shard: int=0, shard_count: int=1, schema_fields=None, storage_options=None,
steps_per_epoch_train: int=1, steps_per_epoch_val: int=1, verbose=True,
debug_data_loader: bool=False, train_async_data_loader_queue_size: int=None,
val_async_data_loader_queue_size: int=None, **kwargs):

def __init__(
self,
train_dir: str,
val_dir: str,
num_train_epochs: int = 1,
has_val: bool = True,
train_batch_size: int = 32,
val_batch_size: int = 32,
shuffle_size: int = 1000,
num_reader_epochs=None,
reader_pool_type: str = "process",
reader_worker_count: int = 2,
transformation=None,
transformation_edit_fields=None,
transformation_removed_fields=None,
inmemory_cache_all=False,
cur_shard: int = 0,
shard_count: int = 1,
schema_fields=None,
storage_options=None,
steps_per_epoch_train: int = 1,
steps_per_epoch_val: int = 1,
verbose=True,
debug_data_loader: bool = False,
train_async_data_loader_queue_size: int = None,
val_async_data_loader_queue_size: int = None,
**kwargs):
super().__init__()
self.train_dir = train_dir
self.val_dir = val_dir
Expand All @@ -30,6 +51,8 @@ def __init__(self, train_dir: str, val_dir: str, num_train_epochs: int=1, has_va
self.reader_pool_type = reader_pool_type
self.reader_worker_count = reader_worker_count
self.transformation = transformation
self.transformation_edit_fields = transformation_edit_fields
self.transformation_removed_fields = transformation_removed_fields
self.inmemory_cache_all = inmemory_cache_all
self.cur_shard = cur_shard
self.shard_count = shard_count
Expand All @@ -45,15 +68,24 @@ def __init__(self, train_dir: str, val_dir: str, num_train_epochs: int=1, has_va
if debug_data_loader:
print("Creating data_module")


def setup(self, stage=None):
# Assign train/val datasets for use in dataloaders
if stage == 'fit' or stage is None:
transform_spec = TransformSpec(self.transformation) if self.transformation else None
if self.transformation is None and self.transformation_edit_fields is None and self.transformation_removed_fields is None:
transform_spec = None
else:
# [TransformSpec](https://github.com/uber/petastorm/blob/3f248003221a648261a36189c95c8705f6ef34ad/petastorm/transform.py#L27)
# defines a user transformation that is applied to a loaded row
# on a worker thread/process.
transform_spec = TransformSpec(
func=self.transformation,
edit_fields=self.transformation_edit_fields,
removed_fields=self.transformation_removed_fields)
# In general, make_batch_reader is faster than make_reader for reading the dataset.
# However, we found out that make_reader performs data transformations much faster than
# make_batch_reader with parallel worker processes. Therefore, the default reader
# we choose is make_batch_reader unless there are data transformations.
# we choose is make_batch_reader unless there are data
# transformations.
reader_factory_kwargs = dict()
if transform_spec:
reader_factory = make_reader
Expand All @@ -69,20 +101,24 @@ def setup(self, stage=None):
schema_fields=self.schema_fields,
storage_options=self.storage_options,
transform_spec=transform_spec,
# Don't shuffle row groups without shuffling.
# Don't shuffle row groups
# without shuffling.
shuffle_row_groups=True if self.shuffle_size > 0 else False,
**reader_factory_kwargs)
if self.has_val:
self.val_reader = reader_factory(self.val_dir, num_epochs=self.num_reader_epochs,
reader_pool_type=self.reader_pool_type,
workers_count=self.reader_worker_count,
cur_shard=self.cur_shard, shard_count=self.shard_count,
hdfs_driver=PETASTORM_HDFS_DRIVER,
schema_fields=self.schema_fields,
storage_options=self.storage_options,
transform_spec=transform_spec,
shuffle_row_groups=False,
**reader_factory_kwargs)
self.val_reader = reader_factory(
self.val_dir,
num_epochs=self.num_reader_epochs,
reader_pool_type=self.reader_pool_type,
workers_count=self.reader_worker_count,
cur_shard=self.cur_shard,
shard_count=self.shard_count,
hdfs_driver=PETASTORM_HDFS_DRIVER,
schema_fields=self.schema_fields,
storage_options=self.storage_options,
transform_spec=transform_spec,
shuffle_row_groups=False,
**reader_factory_kwargs)

def teardown(self, stage=None):
if stage == "fit" or stage is None:
Expand All @@ -106,10 +142,12 @@ def teardown(self, stage=None):
def train_dataloader(self):
if self.verbose:
print("Setup train dataloader")
kwargs = dict(reader=self.train_reader, batch_size=self.train_batch_size,
name="train dataloader",
limit_step_per_epoch=self.steps_per_epoch_train,
verbose=self.verbose)
kwargs = dict(
reader=self.train_reader,
batch_size=self.train_batch_size,
name="train dataloader",
limit_step_per_epoch=self.steps_per_epoch_train,
verbose=self.verbose)
if self.inmemory_cache_all:
# Use inmem dataloader
dataloader_class = PytorchInmemAsyncDataLoader
Expand All @@ -127,9 +165,11 @@ def train_dataloader(self):
kwargs['async_loader_queue_size'] = self.train_async_data_loader_queue_size
elif isinstance(self.train_async_data_loader_queue_size, float):
# use async data loader queue size as ratio of total steps.
kwargs['async_loader_queue_size'] = int(kwargs['limit_step_per_epoch'] * self.train_async_data_loader_queue_size)
kwargs['async_loader_queue_size'] = int(
kwargs['limit_step_per_epoch'] * self.train_async_data_loader_queue_size)
else:
raise RuntimeError(f"Unsupported type for train_async_data_loader_queue_size={self.train_async_data_loader_queue_size}")
raise RuntimeError(
f"Unsupported type for train_async_data_loader_queue_size={self.train_async_data_loader_queue_size}")

self.train_dl = dataloader_class(**kwargs)
return self.train_dl
Expand Down Expand Up @@ -160,9 +200,11 @@ def val_dataloader(self):
kwargs['async_loader_queue_size'] = self.val_async_data_loader_queue_size
elif isinstance(self.val_async_data_loader_queue_size, float):
# use async data loader queue size as ratio of total steps.
kwargs['async_loader_queue_size'] = int(kwargs['limit_step_per_epoch'] * self.val_async_data_loader_queue_size)
kwargs['async_loader_queue_size'] = int(
kwargs['limit_step_per_epoch'] * self.val_async_data_loader_queue_size)
else:
raise RuntimeError(f"Unsupported type for val_async_data_loader_queue_size={self.val_async_data_loader_queue_size}")
raise RuntimeError(
f"Unsupported type for val_async_data_loader_queue_size={self.val_async_data_loader_queue_size}")

self.val_dl = dataloader_class(**kwargs)
return self.val_dl
8 changes: 8 additions & 0 deletions horovod/spark/lightning/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
[TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py)
for more details. Note that this fucntion constructs another function
which should perform the transformation.
transformation_edit_fields: (Optional) A list of 4-tuples with the following fields:
``(name, numpy_dtype, shape, is_nullable)`` used for Petastorm
[TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py)
to add more fields into the schema.
transformation_removed_fields: (Optional). A list of field names that will be removed from the original schema
used for Petastorm [TransformSpec](https://github.com/uber/petastorm/blob/master/petastorm/transform.py).
val_batch_size: Number of rows from the DataFrame per batch for validation, if not set,
will use batch_size.
val_reader_num_workers: Similar to the train_reader_num_workers.
Expand Down Expand Up @@ -249,6 +255,8 @@ def __init__(self,
train_steps_per_epoch=None,
validation_steps_per_epoch=None,
transformation_fn=None,
transformation_edit_fields=None,
transformation_removed_fields=None,
train_reader_num_workers=None,
trainer_args=None,
val_reader_num_workers=None,
Expand Down
4 changes: 4 additions & 0 deletions horovod/spark/lightning/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def RemoteTrainer(estimator, metadata, ckpt_bytes, run_id, dataset_idx, train_ro
terminate_on_nan = estimator.getTerminateOnNan()
transformation_fn = estimator.getTransformationFn()
transformation = transformation_fn if transformation_fn else None
transformation_edit_fields = estimator.getTransformationEditFields()
transformation_removed_fields = estimator.getTransformationRemovedFields()
inmemory_cache_all = estimator.getInMemoryCacheAll()
callbacks = estimator.getCallbacks() or []
train_steps_per_epoch = estimator.getTrainStepsPerEpoch()
Expand Down Expand Up @@ -272,6 +274,8 @@ def on_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -
'reader_pool_type': reader_pool_type,
'reader_worker_count': train_reader_worker_count,
'transformation': transformation,
'transformation_edit_fields': transformation_edit_fields,
'transformation_removed_fields': transformation_removed_fields,
'inmemory_cache_all': inmemory_cache_all,
'cur_shard': hvd.rank(),
'shard_count': hvd.size(),
Expand Down
0