8000 Add progress bars to hash operators by iamjustinhsu · Pull Request #53175 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add progress bars to hash operators #53175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

iamjustinhsu
Copy link
Collaborator
@iamjustinhsu iamjustinhsu commented May 20, 2025

Why are these changes needed?

We want to make transparent progress bars for all operators. Currently, join operators (or any operator that uses HashShufflingBaseOperator) will not show a progress bar, so it looks a lil funky.

After these changes

image

Other changes

  • In OpruntimeMetrics, added num_row_inputs_received (a counterpart to num_inputs_received)
  • Uplifted some estimated row output computation logic from the MapOperator to PhysicalOperator so it can also be used by HashShufflingBaseOperator

Related issue number

https://anyscale1.atlassian.net/browse/DATA-574 and https://anyscale1.atlassian.net/browse/DATA-925

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 5 times, most recently from 02ff932 to 16f616d Compare May 21, 2025 16:51
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 2 times, most recently from 587538c to 5275fa6 Compare May 21, 2025 23:44
@iamjustinhsu iamjustinhsu marked this pull request as ready for review May 27, 2025 18:49
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner May 27, 2025 18:49
Comment on lines 246 to 247
self._sub_progress_bar_names = sub_progress_bar_names
self._sub_progress_bar_dict: Dict[str, ProgressBar] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need names separate form PBs themselves?

Copy link
Collaborator Author
@iamjustinhsu iamjustinhsu May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you initialize a ProgressBar, it will display in the terminal. we don't want this behavior because it will appear as if the operation is running, but it's not, until we initialize all sub progress bars

Comment on lines 636 to 641
self._metrics.on_task_output_generated(task_index=task_index, output=bundle)
self._output_queue.append(bundle)
shuffle_reduce_bar.update(
i=bundle.num_rows(), total=self.num_output_rows_total()
)
self._metrics.on_output_queued(bundle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please move metrics callbacks together

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure no strong opinion on this instance, but following this example, it seems like self._metrics.on_task_output_generated(task_index=task_index, output=bundle) is on the start of the function and self._metrics.on_output_queued(bundle) is at the end of the function

Comment on lines 721 to 725
self._metrics.on_task_submitted(
self._next_shuffle_tasks_idx + partition_id,
RefBundle([], owns_blocks=True),
)
shuffle_reduce_bar.update(i=0, total=self.num_output_rows_total())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this will be using single estimate for both shuffle and finalization tasks. Mixing them up doesn't really make a lot of sense.

What i'm thinking is we should explore is using separate metrics for shuffling and finalization stages (which will be kind of split the operator into 2)

@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch from 4427aa7 to c6e1abe Compare June 12, 2025 01:17
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 2 times, most recently from 74f00c2 to 6879943 Compare June 12, 2025 04:14
@iamjustinhsu iamjustinhsu requested review from a team and edoakes as code owners June 12, 2025 04:14
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 2 times, most recently from 7d97483 to 7058ede Compare June 12, 2025 14:51
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch from 7058ede to 793c03f Compare June 12, 2025 14:52
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch from 924df8c to 9bf7929 Compare June 13, 2025 00:34
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 2 times, most recently from ed06ea8 to 2d3b83d Compare June 13, 2025 02:15
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch 2 times, most recently from 455bcaf to 044bbf8 Compare June 13, 2025 02:49
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/add-pb-to-hash-operators branch from 044bbf8 to 251edfd Compare June 13, 2025 16:33

class ReportsExtraResourceUsage(abc.ABC):
@abc.abstractmethod
def extra_resource_usage(self: PhysicalOperator) -> ExecutionResources:
"""Returns resources used by this operator beyond standard accounting."""
...


class ContainsSubProgressBars(PhysicalOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ContainsSubProgressBars(PhysicalOperator):
class WithSubProgressBarMixin:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to inherit from PO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was doing it for type checking, i can change it back

super().__init__(*args, **kwargs)
self._sub_progress_bar_names: Optional[List[str]] = sub_progress_bar_names
self._sub_progress_bar_dict: Optional[Dict[str, ProgressBar]] = None
self._metric_dict: Dict[str, OpRuntimeMetrics] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should live outside of this one -- let's start by keeping it in the Shuffling Operator base but if there's opportunity to abstract away we'd see it and do it later

@@ -334,7 +347,7 @@ def combine(one: "_PartitionStats", other: "_PartitionStats") -> "_PartitionStat
)


class HashShufflingOperatorBase(PhysicalOperator):
class HashShufflingOperatorBase(ContainsSubProgressBars, PhysicalOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class HashShufflingOperatorBase(ContainsSubProgressBars, PhysicalOperator):
class HashShufflingOperatorBase(PhysicalOperator, ContainsSubProgressBars):

Comment on lines +387 to +389
shuffle_progress_bar_name = "Hash Shuffle Map"
if finalize_progress_bar_name is None:
finalize_progress_bar_name = "Hash Shuffle Reduce"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
shuffle_progress_bar_name = "Hash Shuffle Map"
if finalize_progress_bar_name is None:
finalize_progress_bar_name = "Hash Shuffle Reduce"
shuffle_progress_bar_name = "Shuffle"
if finalize_progress_bar_name is None:
finalize_progress_bar_name = "Reduce"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just simplify it

@@ -451,90 +476,129 @@ def start(self, options: ExecutionOptions) -> None:
self._aggregator_pool.start()

def _add_input_inner(self, input_bundle: RefBundle, input_index: int) -> None:

shuffle_metrics = self.get_metrics(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up with my comment above -- let's re-home these multi-metrics set up in this class (being _shuffle_stage_metrics and _reduce_stage_metrics)

cur_shuffle_task_idx % self._num_partitions
if not input_key_column_names
else None
def _on_partitioning_done(cur_shuffle_task_idx: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you're pulling up this method and unfortunately now GH just shows this whole file as pretty much rewritten because of shifting lines and some other changes laid on top.

It's totally cool to refactor if you want to make things better but please avoid mixing up refactorings like that and critical changes in semantic. It's making it practically impossible for me to effectively review this change and assert correctness of what we're doing.

Let's extract the refactoring part into PR stacked on top so these could be reviewed separately (and much faster)

Copy link
Collaborator Author
@iamjustinhsu iamjustinhsu Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, ill try to move it back to what it was before

didn't intend to make it harder to review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0