8000 multiprocess dataset reader and iterator by joelgrus · Pull Request #1760 · allenai/allennlp · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.

multiprocess dataset reader and iterator #1760

Merged
merged 26 commits into from
Sep 13, 2018

Conversation

joelgrus
Copy link
Contributor

the DatasetReader is a wrapper for any other DatasetReader

the iterator is its own iterator. (our iterator code is way too complicated to make a similar wrapper possible, and anyway it feels less important for the iterator to work that way)

Copy link
Contributor
@matt-peters matt-peters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great 👍

for _ in range(self.num_workers):
input_queue.put(None)

# TODO(joelgrus): where does this number come from?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could allow this to be configurable, and may need to increase it depending on how many instances the iterator uses for each batch. We want it to be large enough allow a significant buffer so the iterator doesn't have to wait for instances but not too large that it starts taking up significant memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this

"""
instances: List[Instance] = []

def make_batches() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we can't re-use an existing iterator here? For many applications we'd want this to have more logic to do something like the bucket iterator, or optionally allow shuffling all of the instances read into memory before creating batches, etc. Right now we will always read each file in the same order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good idea

if len(instances) >= max_instances_in_memory:
make_batches()

def _queuer(instances: Iterable[Instance],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to remove the _queuer if we are also using the MultiprocessDatasetReader by passing the reader queue directly to the MultiprocessIterator? E.g. the MultiprocessIterator can iterate over either an iterable of instances or a queue filled with instances?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the one that's going to be hard, the problem is that the trainer does

        train_generator = self._iterator(self._train_data,
                                         num_epochs=1,
                                         shuffle=self._shuffle)

and so I'd have to a weird coupling where the iterator gets the queue directly, let me think about it a little bit

shuffle: bool = True) -> Iterator[TensorDict]:

# If you run it forever, the multiprocesses won't shut down correctly.
# TODO(joelgrus) find a solution for this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm partial to pkill...

if num_epochs is None:
raise ConfigurationError("Multiprocess Iterator must be run for a fixed number of epochs")

# TODO(joelgrus) are these the right sizes?
Copy link
8000
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above -- seems we could also make the input queue size approximately batch_size times larger then output queue size.

@joelgrus
Copy link
Contributor Author

ok, I addressed all of your comments except for the "read directly from the queue". it turns out that that's a tricky distributed systems problem (or at least I couldn't figure out how to make it not one).

the key issue is that you have some number of workers that generate instances and put them on the queue.

then you have some other number of workers that pull them off and generate tensor_dicts.

if there is an iterator in the middle, the iterator can recognize when all of the instance-generating workers are done and stop iterating, which tells the tensor-dict-generating workers to stop.

without the iterator in the middle, how do the tensor-dict-generating workers know when to stop? I couldn't figure out a clean solution.

however, I made it so that the Iterator[Instance] that the MultiprocessIterator gets exposes the queue, so that if I can figure out a good solution for this it shouldn't be hard to implement it.

Copy link
Contributor
@matt-peters matt-peters left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other then the shuffle it looks great 👍

yield instance
instance = input_queue.get()

for tensor_dict in iterator(instances(), num_epochs=1, shuffle=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be passing down the shuffle parameter here? I think it will ignore whatever shuffle parameter is passed to __call__ without it.

@joelgrus joelgrus merged commit b5087e7 into allenai:master Sep 13, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0