-
Notifications
You must be signed in to change notification settings - Fork 2.2k
multiprocess dataset reader and iterator #1760
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great 👍
for _ in range(self.num_workers): | ||
input_queue.put(None) | ||
|
||
# TODO(joelgrus): where does this number come from? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could allow this to be configurable, and may need to increase it depending on how many instances the iterator uses for each batch. We want it to be large enough allow a significant buffer so the iterator doesn't have to wait for instances but not too large that it starts taking up significant memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do this
""" | ||
instances: List[Instance] = [] | ||
|
||
def make_batches() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we can't re-use an existing iterator here? For many applications we'd want this to have more logic to do something like the bucket iterator, or optionally allow shuffling all of the instances read into memory before creating batches, etc. Right now we will always read each file in the same order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is a good idea
if len(instances) >= max_instances_in_memory: | ||
make_batches() | ||
|
||
def _queuer(instances: Iterable[Instance], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to remove the _queuer
if we are also using the MultiprocessDatasetReader
by passing the reader queue directly to the MultiprocessIterator
? E.g. the MultiprocessIterator
can iterate over either an iterable of instances or a queue filled with instances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the one that's going to be hard, the problem is that the trainer does
train_generator = self._iterator(self._train_data,
num_epochs=1,
shuffle=self._shuffle)
and so I'd have to a weird coupling where the iterator gets the queue directly, let me think about it a little bit
shuffle: bool = True) -> Iterator[TensorDict]: | ||
|
||
# If you run it forever, the multiprocesses won't shut down correctly. | ||
# TODO(joelgrus) find a solution for this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm partial to pkill
...
if num_epochs is None: | ||
raise ConfigurationError("Multiprocess Iterator must be run for a fixed number of epochs") | ||
|
||
# TODO(joelgrus) are these the right sizes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above -- seems we could also make the input queue size approximately batch_size times larger then output queue size.
ok, I addressed all of your comments except for the "read directly from the queue". it turns out that that's a tricky distributed systems problem (or at least I couldn't figure out how to make it not one). the key issue is that you have some number of workers that generate instances and put them on the queue. then you have some other number of workers that pull them off and generate tensor_dicts. if there is an iterator in the middle, the iterator can recognize when all of the instance-generating workers are done and stop iterating, which tells the tensor-dict-generating workers to stop. without the iterator in the middle, how do the tensor-dict-generating workers know when to stop? I couldn't figure out a clean solution. however, I made it so that the Iterator[Instance] that the MultiprocessIterator gets exposes the queue, so that if I can figure out a good solution for this it shouldn't be hard to implement it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other then the shuffle
it looks great 👍
yield instance | ||
instance = input_queue.get() | ||
|
||
for tensor_dict in iterator(instances(), num_epochs=1, shuffle=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we be passing down the shuffle
parameter here? I think it will ignore whatever shuffle
parameter is passed to __call__
without it.
the DatasetReader is a wrapper for any other DatasetReader
the iterator is its own iterator. (our iterator code is way too complicated to make a similar wrapper possible, and anyway it feels less important for the iterator to work that way)