8000 In-sync with master by GoodDok · Pull Request #2 · GoodDok/luigi · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

In-sync with master #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
< 10000 h5 data-view-component="true" class="mb-2"> Diff view
Diff view
6 changes: 2 additions & 4 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,8 @@ timeout

Number of seconds after which to kill a task which has been running
for too long. This provides a default value for all tasks, which can
be overridden by setting the worker-timeout property in any task. This
only works when using multiple workers, as the timeout is implemented
by killing worker subprocesses. Default value is 0, meaning no
timeout.
be overridden by setting the worker-timeout property in any task.
Default value is 0, meaning no timeout.

wait_interval
Number of seconds for the worker to wait before asking the scheduler
Expand Down
89 changes: 65 additions & 24 deletions doc/example_top_artists.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ Example – Top Artists
---------------------

This is a very simplified case of something we do at Spotify a lot.
All user actions are logged to HDFS where
we run a bunch of Hadoop jobs to transform the data.
All user actions are logged to Google Cloud Storage (previously HDFS) where
we run a bunch of processing jobs to transform the data. The processing code itself is implemented
in a scalable data processing framework, such as Scio, Scalding, or Spark, but the jobs
are orchestrated with Luigi.
At some point we might end up with
a smaller data set that we can bulk ingest into Cassandra, Postgres, or
some other format.
other storage suitable for serving or exploration.

For the purpose of this exercise, we want to aggregate all streams,
find the top 10 artists and then put the results into Postgres.
Expand Down Expand Up @@ -83,7 +85,7 @@ Note that *top_artists* needs to be in your PYTHONPATH, or else this can produc

$ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06

You can also try to view the manual using `--help` which will give you an
You can also try to view the manual using ``--help`` which will give you an
overview of the options.

Running the command again will do nothing because the output file is
Expand All @@ -95,39 +97,78 @@ the input files is modified.
You need to delete the output file
manually.

The `--local-scheduler` flag tells Luigi not to connect to a scheduler
The ``--local-scheduler`` flag tells Luigi not to connect to a scheduler
server. This is not recommended for other purpose than just testing
things.

Step 1b - Running this in Hadoop
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Step 1b - Aggregate artists with Spark
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Luigi comes with native Python Hadoop mapreduce support built in, and
here is how this could look like, instead of the class above.
While Luigi can process data inline, it is normally used to orchestrate external programs that
perform the actual processing. In this example, we will demonstrate how top artists instead can be
read from HDFS and calculated with Spark, orchestrated by Luigi.

.. code:: python

class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask):
date_interval = luigi.DateIntervalParameter()

app = 'top_artists_spark.py'
master = 'local[*]'

def output(self):
return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)

def requires(self):
return [StreamsHdfs(date) for date in self.date_interval]

def mapper(self, line):
timestamp, artist, track = line.strip().split()
yield artist, 1
def app_options(self):
# :func:`~luigi.task.Task.input` returns the targets produced by the tasks in
# `~luigi.task.Task.requires`.
return [','.join([p.path for p in self.input()]),
self.output().path]


:class:`luigi.contrib.hadoop.SparkSubmitTask` doesn't require you to implement a
:func:`~luigi.task.Task.run` method. Instead, you specify the command line parameters to send
to ``spark-submit``, as well as any other configuration specific to Spark.

Python code for the Spark job is found below.

.. code:: python

import operator
import sys
from pyspark.sql import SparkSession


def main(argv):
input_paths = argv[1].split(',')
output_path = argv[2]

spark = SparkSession.builder.getOrCreate()

streams = spark.read.option('sep', '\t').csv(input_paths[0])
for stream_path in input_paths[1:]:
streams.union(spark.read.option('sep', '\t').csv(stream_path))

# The second field is the artist
counts = streams \
.map(lambda row: (row[1], 1)) \
.reduceByKey(add)

counts.write.option('sep', '\t').csv(output_path)


if __name__ == '__main__':
sys.exit(main(sys.argv))


def reducer(self, key, values):
yield key, sum(values)
In a typical deployment scenario, the Luigi orchestration definition above as well as the
Pyspark processing code would be packaged into a deployment package, such as a container image. The
processing code does not have to be implemented in Python, any program can be packaged in the
image and run from Luigi.

Note that :class:`luigi.contrib.hadoop.JobTask` doesn't require you to implement a
:func:`~luigi.task.Task.run` method. Instead, you typically implement a
:func:`~luigi.contrib.hadoop.JobTask.mapper` and
:func:`~luigi.contrib.hadoop.JobTask.reducer` method. *mapper* and *combiner* require
yielding tuple of only two elements: key and value. Both key and value also may be a tuple.

Step 2 – Find the Top Artists
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -148,7 +189,7 @@ we choose to do this not as a Hadoop job, but just as a plain old for-loop in Py

def requires(self):
if self.use_hadoop:
return AggregateArtistsHadoop(self.date_interval)
return AggregateArtistsSpark(self.date_interval)
else:
return AggregateArtists(self.date_interval)

Expand Down Expand Up @@ -213,15 +254,15 @@ building all its upstream dependencies.
Using the Central Planner
~~~~~~~~~~~~~~~~~~~~~~~~~

The `--local-scheduler` flag tells Luigi not to connect to a central scheduler.
The ``--local-scheduler`` flag tells Luigi not to connect to a central scheduler.
This is recommended in order to get started and or for development purposes.
At the point where you start putting things in production
we strongly recommend running the central scheduler server.
In addition to providing locking
so that the same task is not run by multiple processes at the same time,
this server also provides a pretty nice visualization of your current work flow.

If you drop the `--local-scheduler` flag,
If you drop the ``--local-scheduler`` flag,
your script will try to connect to the central planner,
by default at localhost port 8082.
If you run
Expand All @@ -234,7 +275,7 @@ in the background and then run your task without the ``--local-scheduler`` flag,
then your script will now schedule through a centralized server.
You need `Tornado <http://www.tornadoweb.org/>`__ for this to work.

Launching `http://localhost:8082` should show something like this:
Launching http://localhost:8082 should show something like this:

.. figure:: web_server.png
:alt: Web server screenshot
Expand Down
66 changes: 30 additions & 36 deletions examples/top_artists.py
1E79
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from luigi import six

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
import luigi.contrib.postgres
import luigi.contrib.spark


class ExternalStreams(luigi.ExternalTask):
Expand Down Expand Up @@ -136,17 +136,28 @@ def run(self):
out_file.write('{}\t{}\n'.format(artist, count))


class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.hadoop.JobTask` task
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over each target data returned by :py:meth:`~/.StreamsHdfs.output` and
writes the result into its :py:meth:`~.AggregateArtistsHadoop.output` target (a file in HDFS).

This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`.
writes the result into its :py:meth:`~.AggregateArtistsSpark.output` target (a file in HDFS).
"""

date_interval = luigi.DateIntervalParameter()

"""
The Pyspark script to run.

For Spark applications written in Java or Scala, the name of a jar file should be supplied instead.
"""
app = 'top_artists_spark.py'

"""
Address of the Spark cluster master. In this case, we are not using a cluster, but running
Spark in local mode.
"""
master = 'local[*]'

def output(self):
"""
Returns the target output for this task.
Expand All @@ -155,10 +166,7 @@ def output(self):
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.contrib.hdfs.HdfsTarget(
"data/artist_streams_%s.tsv" % self.date_interval,
format=luigi.contrib.hdfs.PlainDir
)
return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)

def requires(self):
"""
Expand All @@ -170,48 +178,34 @@ def requires(self):
"""
return [StreamsHdfs(date) for date in self.date_interval]

def mapper(self, line):
"""
The implementation of the map phase of the Hadoop job.

:param line: the input.
:return: tuple ((key, value) or, in this case, (artist, 1 stream count))
"""
_, artist, _ = line.strip().split()
yield artist, 1

def reducer(self, key, values):
"""
The implementation of the reducer phase of the Hadoop job.

:param key: the artist.
:param values: the stream count.
:return: tuple (artist, count of streams)
"""
yield key, sum(values)
def app_options(self):
# :func:`~luigi.task.Task.input` returns the targets produced by the tasks in
# `~luigi.task.Task.requires`.
return [','.join([p.path for p in self.input()]),
self.output().path]


class Top10Artists(luigi.Task):
"""
This task runs over the target data returned by :py:meth:`~/.AggregateArtists.output` or
:py:meth:`~/.AggregateArtistsHadoop.output` in case :py:attr:`~/.Top10Artists.use_hadoop` is set and
:py:meth:`~/.AggregateArtistsSpark.output` in case :py:attr:`~/.Top10Artists.use_spark` is set and
writes the result into its :py:meth:`~.Top10Artists.output` target (a file in local filesystem).
"""

date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
use_spark = luigi.BoolParameter()

def requires(self):
"""
This task's dependencies:

* :py:class:`~.AggregateArtists` or
* :py:class:`~.AggregateArtistsHadoop` if :py:attr:`~/.Top10Artists.use_hadoop` is set.
* :py:class:`~.AggregateArtistsSpark` if :py:attr:`~/.Top10Artists.use_spark` is set.

:return: object (:py:class:`luigi.task.Task`)
"""
if self.use_hadoop:
return AggregateArtistsHadoop(self.date_interval)
if self.use_spark:
return AggregateArtistsSpark(self.date_interval)
else:
return AggregateArtists(self.date_interval)

Expand Down Expand Up @@ -256,7 +250,7 @@ class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
"""

date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
use_spark = luigi.BoolParameter()

host = "localhost"
database = "toplists"
Expand All @@ -277,7 +271,7 @@ def requires(self):

:return: list of object (:py:class:`luigi.task.Task`)
"""
return Top10Artists(self.date_interval, self.use_hadoop)
return Top10Artists(self.date_interval, self.use_spark)


if __name__ == "__main__":
Expand Down
28 changes: 28 additions & 0 deletions examples/top_artists_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-

import operator
import sys

from pyspark.sql import SparkSession


def main(argv):
input_paths = argv[1].split(',')
output_path = argv[2]

spark = SparkSession.builder.getOrCreate()

streams = spark.read.option('sep', '\t').csv(input_paths[0])
for stream_path in input_paths[1:]:
streams.union(spark.read.option('sep', '\t').csv(stream_path))

# The second field is the artist
counts = streams \
.map(lambda row: (row[1], 1)) \
.reduceByKey(operator.add)

counts.write.option('sep', '\t').csv(output_path)


if __name__ == '__main__':
sys.exit(main(sys.argv))
3 changes: 1 addition & 2 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ class MyTask(luigi.Task):

#: Number of seconds after which to time out the run function.
#: No timeout if set to 0.
#: Defaults to 0 or worker-timeout value in config file
#: Only works when using multiple workers.
#: Defaults to 0 or worker-timeout value in config
worker_timeout = None

#: Maximum number of tasks to run together as a batch. Infinite by default
Expand Down
7 changes: 3 additions & 4 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ def __init__(self, task, worker_id, result_queue, status_reporter,
self.worker_id = worker_id
self.result_queue = result_queue
self.status_reporter = status_reporter
if task.worker_timeout is not None:
worker_timeout = task.worker_timeout
self.timeout_time = time.time() + worker_timeout if worker_timeout else None
self.worker_timeout = task.worker_timeout or worker_timeout
self.timeout_time = time.time() + self.worker_timeout if self.worker_timeout else None
self.use_multiprocessing = use_multiprocessing or self.timeout_time is not None
self.check_unfulfilled_deps = check_unfulfilled_deps

Expand Down Expand Up @@ -1026,7 +1025,7 @@ def _purge_children(self):
p.task.trigger_event(Event.PROCESS_FAILURE, p.task, error_msg)
elif p.timeout_time is not None and time.time() > float(p.timeout_time) and p.is_alive():
p.terminate()
error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.task.worker_timeout)
error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.worker_timeout)
p.task.trigger_event(Event.TIMEOUT, p.task, error_msg)
else:
continue
Expand Down
20 changes: 20 additions & 0 deletions test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,26 @@ def store_task(t, error_msg):

self.assertEqual(result, [task])

@mock.patch('luigi.worker.time')
def test_timeout_handler_single_worker(self, mock_time):
result = []

@HangTheWorkerTask.event_handler(Event.TIMEOUT)
def store_task(t, error_msg):
self.assertTrue(error_msg)
result.append(t)

w = Worker(wait_interval=0.01, timeout=5)
mock_time.time.return_value = 0
task = HangTheWorkerTask(worker_timeout=1)
w.add(task)
w._run_task(task.task_id)

mock_time.time.return_value = 3
w._handle_next_task()

self.assertEqual(result, [task])


class PerTaskRetryPolicyBehaviorTest(LuigiTestCase):
def setUp(self):
Expand Down
0