8000 Provide automatic URL tracking for Spark applications by GoodDok · Pull Request #2661 · spotify/luigi · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Provide automatic URL tracking for Spark applications #2661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 5, 2019

Conversation

GoodDok
Copy link
Contributor
@GoodDok GoodDok commented Feb 14, 2019

Description

A new string parameter tracking_url_pattern was added to ExternalProgramTask task in order to provide a mechanism for parsing the task output and update tracking URL in Luigi web UI on the run. Also the new parameter is set for SparkSubmitTask according to the log message structure: https://github.com/apache/spark/blob/0a4c03f7d084f1d2aa48673b99f3b9496893ce8d/core/src/main/scala/org/apache/spark/ui/WebUI.scala#L133

Motivation and Context

One of the most missing things to me in Luigi, while I'm working with Spark applications, is automatic detection of the link for Spark Web UI. It is very uncomfortable to look for Spark jobs e.g. in Yarn UI among thousands of other jobs, having only the name of the Luigi task plus it's parameters.
The existence of such feature in Luigi would hugely simplify the process of tracking Spark jobs that are running at the moment.

Have you tested this? If so, how?

I ran my jobs with this code and it works for me.

@GoodDok GoodDok marked this pull request as ready for review February 14, 2019 22:53
@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch 3 times, most recently from 587c4db to 8c67161 Compare February 15, 2019 07:42
@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from 8c67161 to def262b Compare February 18, 2019 22:24
Copy link
Contributor
@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be really useful indeed!

Can you address the comments I've written?

@@ -60,6 +63,7 @@ class ExternalProgramTask(luigi.Task):
"""

capture_output = luigi.BoolParameter(default=True, significant=False, positional=False)
tracking_url_pattern = luigi.Parameter(default="", significant=False, positional=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document this somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added description + documentation below

@@ -132,6 +140,47 @@ def run(self):
tmp_stdout.close()


class TrackingUrlContext(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this private? _TrackingUrlContext - then you don't need to add documentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

self.set_tracking_url = set_tracking_url
self.track_proc = Process(target=self._track_url_by_pattern)

def __enter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use contextlib? This reminds me of #2652. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done, now looks cleaner

10000
return self.main_proc

def __exit__(self, exc_type, exc_val, exc_tb):
# need to wait a bit to let the subprocess read the last lines
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right way to do it? Is there no signalling or waiting for termination way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced sleep with track_proc.join(timeout), not sure if I have any other option, what do you think?

temp_dir = tempfile.mkdtemp()

# since set_tracking_url is called in a separate process
# we need to do something like creating a temp file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, changed, now looks much better

@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from 6d3e4f7 to 7ed7fe3 Compare February 20, 2019 11:34
@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from 7ed7fe3 to 59114a7 Compare February 20, 2019 11:43
Copy link
Contributor
@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm so happy that most of my comments were useful and led to better code. And thank you so much for the first turn around time!!

Anyway I'm fine with this being merged once my last comments are addressed. I really like this PR. Thanks!

for the job in the web UI. Example: 'Job UI is here: (https?://.*)'.

Default value is an empty string, so URL tracking is not performed.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this docs render nice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how can I check it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently wrote about it here: #2548 (comment)

@@ -131,6 +148,43 @@ def run(self):
tmp_stderr.close()
tmp_stdout.close()

@contextmanager
def proc_with_tracking_url_context(self, proc_args, proc_kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You made it public again -.-

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, fixed

track_proc.join(time_to_sleep * 2)
if track_proc.is_alive():
track_proc.terminate()
pipe_to_read.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this part of the code. Since it's always easy to make a mistake, would you mind trying to run this for a week in production for you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, did it with the latest changes, I'll let you know how it goes

@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from f2dc2ba to 7eefdaa Compare February 21, 2019 17:27
@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from 7eefdaa to fc02a8a Compare February 21, 2019 17:57
@GoodDok
Copy link
Contributor Author
GoodDok commented Feb 21, 2019

@Tarrasch thank you for being so responsive!
While deploying these changes for the working pipeline I figured out that by default Spark apps write all the logs into stderr (before I was refactoring the code on my local machine and it seems like my Spark config is somewhat unusual writing to stdout). Also, the format of output logs for spark depends on deploy-mode used in spark-submit.
So I introduced a new parameter track_url_in_stderr that is False by default + added a test for it. Please let me know if anything else should be added.
Now it's working with the real pipeline, let's take some time, I'll be back with the update.

Copy link
Contributor
@Tarrasch Tarrasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me know when it's been used in prod reliably!

for the job in the web UI. Example: 'Job UI is here: (https?://.*)'.

Default value is an empty string, so URL tracking is not performed.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently wrote about it here: #2548 (comment)


Default value is an empty string, so URL tracking is not performed.
"""
track_url_in_stderr = luigi.BoolParameter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you want track_url_in_stderr = luigi.ChoiceParameter?

@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch 3 times, most recently from 024417e to b62ff46 Compare March 4, 2019 00:16
@GoodDok GoodDok force-pushed the spark-task-with-tracking-url branch from b62ff46 to 2d718da Compare March 4, 2019 00:54
@GoodDok
Copy link
Contributor Author
GoodDok commented Mar 4, 2019

Hi @Tarrasch, just wanted to notify that the changes were applied to the real pipeline and no issues were met during the period of testing.
Also, I was thinking about the changed you suggested regarding luigi.parameter.ChoiceParameter and committed yet another version that seems to me a bit easier to understand.
How it was before:

tracking_url_pattern = luigi.Parameter(
        default="", significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
        description="Regex pattern used for searching URL in the logs of the external program")

track_url_in_stderr = luigi.BoolParameter(
        default=False, significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
        description="When set to True, 'tracking_url_pattern' is searched in stderr instead of stdout")

How it is now:

stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter(
        var_type=str, choices=['none', 'stdout', 'stderr'], default='none',
        significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
        description="Stream for searching tracking URL")

tracking_url_pattern = luigi.OptionalParameter(
        default=None, significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
        description="Regex pattern used for searching URL in the logs of the external program")

How do you think, is it a better option?
The changes are contained in the last commit (they haven't been tested in prod), and the thing that surprises me a lot is a failing codecov/patch: the lines related to the context manager are marked as not covered although there are several tests covering this part...

@Tarrasch Tarrasch merged commit 3701c68 into spotify:master Mar 5, 2019
@Tarrasch
Copy link
Contributor
Tarrasch commented Mar 5, 2019

Feel free to ignore CodeCov.

I would maybe have preferred

tracking_url_pattern = luigi.Parameter(...)

But it's not worth blocking this PR more for that. :)

@GoodDok
Copy link
Contributor Author
GoodDok commented Mar 6, 2019

@Tarrasch thank you very much for a thorough review of my changes! 👍
One more thing, after fetching my last changes to the real pipeline I found out that I forgot to apply the changes to spark.py itself, there must be
stream_for_searching_tracking_url = 'stderr'
instead of
track_url_in_stderr = True
according to the latest changes.
Here is the commit with change: GoodDok@68eee5b
What is the most convenient way to apply it after PR has been merged?

@Tarrasch
Copy link
Contributor
Tarrasch commented Mar 8, 2019

Please send a fix, also with a test if it makes sense to avoid future breakages.

@GoodDok
Copy link
Contributor Author
GoodDok commented Mar 11, 2019

@Tarrasch here it is: ae065b3
The changes contain the correctly named parameter + two tests for cluster and client modes.
I tested the changes locally, but I wanted to run the remote ones also. Should I create a new PR to test these changes remotely?

@Tarrasch
Copy link
Contributor

I can only merge something (conveniently) if you send it as a Pull Request. That way Travis will also run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0