-
Notifications
You must be signed in to change notification settings - Fork 6.5k
Add progress bars to hash operators #53175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add progress bars to hash operators #53175
Conversation
02ff932
to
16f616d
Compare
587538c
to
5275fa6
Compare
5275fa6
to
00a6ead
Compare
682b0ba
to
a6e4219
Compare
…u/add-pb-to-hash-operators
self._sub_progress_bar_names = sub_progress_bar_names | ||
self._sub_progress_bar_dict: Dict[str, ProgressBar] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need names separate form PBs themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you initialize a ProgressBar
, it will display in the terminal. we don't want this behavior because it will appear as if the operation is running, but it's not, until we initialize all sub progress bars
self._metrics.on_task_output_generated(task_index=task_index, output=bundle) | ||
self._output_queue.append(bundle) | ||
shuffle_reduce_bar.update( | ||
i=bundle.num_rows(), total=self.num_output_rows_total() | ||
) | ||
self._metrics.on_output_queued(bundle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please move metrics callbacks together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure no strong opinion on this instance, but following this example, it seems like self._metrics.on_task_output_generated(task_index=task_index, output=bundle)
is on the start of the function and self._metrics.on_output_queued(bundle)
is at the end of the function
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
self._metrics.on_task_submitted( | ||
self._next_shuffle_tasks_idx + partition_id, | ||
RefBundle([], owns_blocks=True), | ||
) | ||
shuffle_reduce_bar.update(i=0, total=self.num_output_rows_total()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this will be using single estimate for both shuffle and finalization tasks. Mixing them up doesn't really make a lot of sense.
What i'm thinking is we should explore is using separate metrics for shuffling and finalization stages (which will be kind of split the operator into 2)
…u/add-pb-to-hash-operators
…u/add-pb-to-hash-operators
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
4427aa7
to
c6e1abe
Compare
74f00c2
to
6879943
Compare
7d97483
to
7058ede
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
7058ede
to
793c03f
Compare
924df8c
to
9bf7929
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ed06ea8
to
2d3b83d
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
455bcaf
to
044bbf8
Compare
044bbf8
to
251edfd
Compare
|
||
class ReportsExtraResourceUsage(abc.ABC): | ||
@abc.abstractmethod | ||
def extra_resource_usage(self: PhysicalOperator) -> ExecutionResources: | ||
"""Returns resources used by this operator beyond standard accounting.""" | ||
... | ||
|
||
|
||
class ContainsSubProgressBars(PhysicalOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class ContainsSubProgressBars(PhysicalOperator): | |
class WithSubProgressBarMixin: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to inherit from PO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was doing it for type checking, i can change it back
super().__init__(*args, **kwargs) | ||
self._sub_progress_bar_names: Optional[List[str]] = sub_progress_bar_names | ||
self._sub_progress_bar_dict: Optional[Dict[str, ProgressBar]] = None | ||
self._metric_dict: Dict[str, OpRuntimeMetrics] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should live outside of this one -- let's start by keeping it in the Shuffling Operator base but if there's opportunity to abstract away we'd see it and do it later
@@ -334,7 +347,7 @@ def combine(one: "_PartitionStats", other: "_PartitionStats") -> "_PartitionStat | |||
) | |||
|
|||
|
|||
class HashShufflingOperatorBase(PhysicalOperator): | |||
class HashShufflingOperatorBase(ContainsSubProgressBars, PhysicalOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class HashShufflingOperatorBase(ContainsSubProgressBars, PhysicalOperator): | |
class HashShufflingOperatorBase(PhysicalOperator, ContainsSubProgressBars): |
shuffle_progress_bar_name = "Hash Shuffle Map" | ||
if finalize_progress_bar_name is None: | ||
finalize_progress_bar_name = "Hash Shuffle Reduce" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffle_progress_bar_name = "Hash Shuffle Map" | |
if finalize_progress_bar_name is None: | |
finalize_progress_bar_name = "Hash Shuffle Reduce" | |
shuffle_progress_bar_name = "Shuffle" | |
if finalize_progress_bar_name is None: | |
finalize_progress_bar_name = "Reduce" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just simplify it
@@ -451,90 +476,129 @@ def start(self, options: ExecutionOptions) -> None: | |||
self._aggregator_pool.start() | |||
|
|||
def _add_input_inner(self, input_bundle: RefBundle, input_index: int) -> None: | |||
|
|||
shuffle_metrics = self.get_metrics(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up with my comment above -- let's re-home these multi-metrics set up in this class (being _shuffle_stage_metrics
and _reduce_stage_metrics
)
cur_shuffle_task_idx % self._num_partitions | ||
if not input_key_column_names | ||
else None | ||
def _on_partitioning_done(cur_shuffle_task_idx: int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you're pulling up this method and unfortunately now GH just shows this whole file as pretty much rewritten because of shifting lines and some other changes laid on top.
It's totally cool to refactor if you want to make things better but please avoid mixing up refactorings like that and critical changes in semantic. It's making it practically impossible for me to effectively review this change and assert correctness of what we're doing.
Let's extract the refactoring part into PR stacked on top so these could be reviewed separately (and much faster)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, ill try to move it back to what it was before
didn't intend to make it harder to review
Why are these changes needed?
We want to make transparent progress bars for all operators. Currently,
join
operators (or any operator that usesHashShufflingBaseOperator
) will not show a progress bar, so it looks a lil funky.After these changes
Other changes
OpruntimeMetrics
, addednum_row_inputs_received
(a counterpart tonum_inputs_received
)MapOperator
toPhysicalOperator
so it can also be used byHashShufflingBaseOperator
Related issue number
https://anyscale1.atlassian.net/browse/DATA-574 and https://anyscale1.atlassian.net/browse/DATA-925
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.