-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Provide automatic URL tracking for Spark applications #2661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
587c4db
to
8c67161
Compare
8c67161
to
def262b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be really useful indeed!
Can you address the comments I've written?
luigi/contrib/external_program.py
Outdated
@@ -60,6 +63,7 @@ class ExternalProgramTask(luigi.Task): | |||
""" | |||
|
|||
capture_output = luigi.BoolParameter(default=True, significant=False, positional=False) | |||
tracking_url_pattern = luigi.Parameter(default="", significant=False, positional=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document this somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added description + documentation below
luigi/contrib/external_program.py
Outdated
@@ -132,6 +140,47 @@ def run(self): | |||
tmp_stdout.close() | |||
|
|||
|
|||
class TrackingUrlContext(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this private? _TrackingUrlContext
- then you don't need to add documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done
luigi/contrib/external_program.py
Outdated
self.set_tracking_url = set_tracking_url | ||
self.track_proc = Process(target=self._track_url_by_pattern) | ||
|
||
def __enter__(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use contextlib? This reminds me of #2652. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done, now looks cleaner
luigi/contrib/external_program.py
Outdated
10000
return self.main_proc | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
# need to wait a bit to let the subprocess read the last lines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right way to do it? Is there no signalling or waiting for termination way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced sleep
with track_proc.join(timeout)
, not sure if I have any other option, what do you think?
temp_dir = tempfile.mkdtemp() | ||
|
||
# since set_tracking_url is called in a separate process | ||
# we need to do something like creating a temp file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, changed, now looks much better
6d3e4f7
to
7ed7fe3
Compare
7ed7fe3
to
59114a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm so happy that most of my comments were useful and led to better code. And thank you so much for the first turn around time!!
Anyway I'm fine with this being merged once my last comments are addressed. I really like this PR. Thanks!
for the job in the web UI. Example: 'Job UI is here: (https?://.*)'. | ||
|
||
Default value is an empty string, so URL tracking is not performed. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this docs render nice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, how can I check it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recently wrote about it here: #2548 (comment)
luigi/contrib/external_program.py
Outdated
@@ -131,6 +148,43 @@ def run(self): | |||
tmp_stderr.close() | |||
tmp_stdout.close() | |||
|
|||
@contextmanager | |||
def proc_with_tracking_url_context(self, proc_args, proc_kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You made it public again -.-
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, fixed
track_proc.join(time_to_sleep * 2) | ||
if track_proc.is_alive(): | ||
track_proc.terminate() | ||
pipe_to_read.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this part of the code. Since it's always easy to make a mistake, would you mind trying to run this for a week in production for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, did it with the latest changes, I'll let you know how it goes
f2dc2ba
to
7eefdaa
Compare
7eefdaa
to
fc02a8a
Compare
@Tarrasch thank you for being so responsive! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Let me know when it's been used in prod reliably!
for the job in the web UI. Example: 'Job UI is here: (https?://.*)'. | ||
|
||
Default value is an empty string, so URL tracking is not performed. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recently wrote about it here: #2548 (comment)
luigi/contrib/external_program.py
Outdated
|
||
Default value is an empty string, so URL tracking is not performed. | ||
""" | ||
track_url_in_stderr = luigi.BoolParameter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you want track_url_in_stderr = luigi.ChoiceParameter
?
024417e
to
b62ff46
Compare
b62ff46
to
2d718da
Compare
Hi @Tarrasch, just wanted to notify that the changes were applied to the real pipeline and no issues were met during the period of testing.
How it is now:
How do you think, is it a better option? |
Feel free to ignore CodeCov. I would maybe have preferred
But it's not worth blocking this PR more for that. :) |
@Tarrasch thank you very much for a thorough review of my changes! 👍 |
Please send a fix, also with a test if it makes sense to avoid future breakages. |
I can only merge something (conveniently) if you send it as a Pull Request. That way Travis will also run. |
Description
A new string parameter
tracking_url_pattern
was added toExternalProgramTask
task in order to provide a mechanism for parsing the task output and update tracking URL in Luigi web UI on the run. Also the new parameter is set forSparkSubmitTask
according to the log message structure: https://github.com/apache/spark/blob/0a4c03f7d084f1d2aa48673b99f3b9496893ce8d/core/src/main/scala/org/apache/spark/ui/WebUI.scala#L133Motivation and Context
One of the most missing things to me in Luigi, while I'm working with Spark applications, is automatic detection of the link for Spark Web UI. It is very uncomfortable to look for Spark jobs e.g. in Yarn UI among thousands of other jobs, having only the name of the Luigi task plus it's parameters.
The existence of such feature in Luigi would hugely simplify the process of tracking Spark jobs that are running at the moment.
Have you tested this? If so, how?
I ran my jobs with this code and it works for me.