diff --git a/doc/configuration.rst b/doc/configuration.rst index 5efbd22268..2fdb3b6e4a 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -300,10 +300,8 @@ timeout Number of seconds after which to kill a task which has been running for too long. This provides a default value for all tasks, which can - be overridden by setting the worker-timeout property in any task. This - only works when using multiple workers, as the timeout is implemented - by killing worker subprocesses. Default value is 0, meaning no - timeout. + be overridden by setting the worker-timeout property in any task. + Default value is 0, meaning no timeout. wait_interval Number of seconds for the worker to wait before asking the scheduler diff --git a/doc/example_top_artists.rst b/doc/example_top_artists.rst index b772202636..a34bcb3775 100644 --- a/doc/example_top_artists.rst +++ b/doc/example_top_artists.rst @@ -2,11 +2,13 @@ Example – Top Artists --------------------- This is a very simplified case of something we do at Spotify a lot. -All user actions are logged to HDFS where -we run a bunch of Hadoop jobs to transform the data. +All user actions are logged to Google Cloud Storage (previously HDFS) where +we run a bunch of processing jobs to transform the data. The processing code itself is implemented +in a scalable data processing framework, such as Scio, Scalding, or Spark, but the jobs +are orchestrated with Luigi. At some point we might end up with a smaller data set that we can bulk ingest into Cassandra, Postgres, or -some other format. +other storage suitable for serving or exploration. For the purpose of this exercise, we want to aggregate all streams, find the top 10 artists and then put the results into Postgres. @@ -83,7 +85,7 @@ Note that *top_artists* needs to be in your PYTHONPATH, or else this can produc $ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06 -You can also try to view the manual using `--help` which will give you an +You can also try to view the manual using ``--help`` which will give you an overview of the options. Running the command again will do nothing because the output file is @@ -95,39 +97,78 @@ the input files is modified. You need to delete the output file manually. -The `--local-scheduler` flag tells Luigi not to connect to a scheduler +The ``--local-scheduler`` flag tells Luigi not to connect to a scheduler server. This is not recommended for other purpose than just testing things. -Step 1b - Running this in Hadoop -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Step 1b - Aggregate artists with Spark +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Luigi comes with native Python Hadoop mapreduce support built in, and -here is how this could look like, instead of the class above. +While Luigi can process data inline, it is normally used to orchestrate external programs that +perform the actual processing. In this example, we will demonstrate how top artists instead can be +read from HDFS and calculated with Spark, orchestrated by Luigi. .. code:: python - class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask): + class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask): date_interval = luigi.DateIntervalParameter() + app = 'top_artists_spark.py' + master = 'local[*]' + def output(self): return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval) def requires(self): return [StreamsHdfs(date) for date in self.date_interval] - def mapper(self, line): - timestamp, artist, track = line.strip().split() - yield artist, 1 + def app_options(self): + # :func:`~luigi.task.Task.input` returns the targets produced by the tasks in + # `~luigi.task.Task.requires`. + return [','.join([p.path for p in self.input()]), + self.output().path] + + +:class:`luigi.contrib.hadoop.SparkSubmitTask` doesn't require you to implement a +:func:`~luigi.task.Task.run` method. Instead, you specify the command line parameters to send +to ``spark-submit``, as well as any other configuration specific to Spark. + +Python code for the Spark job is found below. + +.. code:: python + + import operator + import sys + from pyspark.sql import SparkSession + + + def main(argv): + input_paths = argv[1].split(',') + output_path = argv[2] + + spark = SparkSession.builder.getOrCreate() + + streams = spark.read.option('sep', '\t').csv(input_paths[0]) + for stream_path in input_paths[1:]: + streams.union(spark.read.option('sep', '\t').csv(stream_path)) + + # The second field is the artist + counts = streams \ + .map(lambda row: (row[1], 1)) \ + .reduceByKey(add) + + counts.write.option('sep', '\t').csv(output_path) + + + if __name__ == '__main__': + sys.exit(main(sys.argv)) + - def reducer(self, key, values): - yield key, sum(values) +In a typical deployment scenario, the Luigi orchestration definition above as well as the +Pyspark processing code would be packaged into a deployment package, such as a container image. The +processing code does not have to be implemented in Python, any program can be packaged in the +image and run from Luigi. -Note that :class:`luigi.contrib.hadoop.JobTask` doesn't require you to implement a -:func:`~luigi.task.Task.run` method. Instead, you typically implement a -:func:`~luigi.contrib.hadoop.JobTask.mapper` and -:func:`~luigi.contrib.hadoop.JobTask.reducer` method. *mapper* and *combiner* require -yielding tuple of only two elements: key and value. Both key and value also may be a tuple. Step 2 – Find the Top Artists ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -148,7 +189,7 @@ we choose to do this not as a Hadoop job, but just as a plain old for-loop in Py def requires(self): if self.use_hadoop: - return AggregateArtistsHadoop(self.date_interval) + return AggregateArtistsSpark(self.date_interval) else: return AggregateArtists(self.date_interval) @@ -213,7 +254,7 @@ building all its upstream dependencies. Using the Central Planner ~~~~~~~~~~~~~~~~~~~~~~~~~ -The `--local-scheduler` flag tells Luigi not to connect to a central scheduler. +The ``--local-scheduler`` flag tells Luigi not to connect to a central scheduler. This is recommended in order to get started and or for development purposes. At the point where you start putting things in production we strongly recommend running the central scheduler server. @@ -221,7 +262,7 @@ In addition to providing locking so that the same task is not run by multiple processes at the same time, this server also provides a pretty nice visualization of your current work flow. -If you drop the `--local-scheduler` flag, +If you drop the ``--local-scheduler`` flag, your script will try to connect to the central planner, by default at localhost port 8082. If you run @@ -234,7 +275,7 @@ in the background and then run your task without the ``--local-scheduler`` flag, then your script will now schedule through a centralized server. You need `Tornado `__ for this to work. -Launching `http://localhost:8082` should show something like this: +Launching http://localhost:8082 should show something like this: .. figure:: web_server.png :alt: Web server screenshot diff --git a/examples/top_artists.py b/examples/top_artists.py index a80e683494..29ad4035e4 100755 --- a/examples/top_artists.py +++ b/examples/top_artists.py @@ -22,9 +22,9 @@ from luigi import six import luigi -import luigi.contrib.hadoop import luigi.contrib.hdfs import luigi.contrib.postgres +import luigi.contrib.spark class ExternalStreams(luigi.ExternalTask): @@ -136,17 +136,28 @@ def run(self): out_file.write('{}\t{}\n'.format(artist, count)) -class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask): +class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask): """ - This task runs a :py:class:`luigi.contrib.hadoop.JobTask` task + This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task over each target data returned by :py:meth:`~/.StreamsHdfs.output` and - writes the result into its :py:meth:`~.AggregateArtistsHadoop.output` target (a file in HDFS). - - This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`. + writes the result into its :py:meth:`~.AggregateArtistsSpark.output` target (a file in HDFS). """ date_interval = luigi.DateIntervalParameter() + """ + The Pyspark script to run. + + For Spark applications written in Java or Scala, the name of a jar file should be supplied instead. + """ + app = 'top_artists_spark.py' + + """ + Address of the Spark cluster master. In this case, we are not using a cluster, but running + Spark in local mode. + """ + master = 'local[*]' + def output(self): """ Returns the target output for this task. @@ -155,10 +166,7 @@ def output(self): :return: the target output for this task. :rtype: object (:py:class:`luigi.target.Target`) """ - return luigi.contrib.hdfs.HdfsTarget( - "data/artist_streams_%s.tsv" % self.date_interval, - format=luigi.contrib.hdfs.PlainDir - ) + return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval) def requires(self): """ @@ -170,48 +178,34 @@ def requires(self): """ return [StreamsHdfs(date) for date in self.date_interval] - def mapper(self, line): - """ - The implementation of the map phase of the Hadoop job. - - :param line: the input. - :return: tuple ((key, value) or, in this case, (artist, 1 stream count)) - """ - _, artist, _ = line.strip().split() - yield artist, 1 - - def reducer(self, key, values): - """ - The implementation of the reducer phase of the Hadoop job. - - :param key: the artist. - :param values: the stream count. - :return: tuple (artist, count of streams) - """ - yield key, sum(values) + def app_options(self): + # :func:`~luigi.task.Task.input` returns the targets produced by the tasks in + # `~luigi.task.Task.requires`. + return [','.join([p.path for p in self.input()]), + self.output().path] class Top10Artists(luigi.Task): """ This task runs over the target data returned by :py:meth:`~/.AggregateArtists.output` or - :py:meth:`~/.AggregateArtistsHadoop.output` in case :py:attr:`~/.Top10Artists.use_hadoop` is set and + :py:meth:`~/.AggregateArtistsSpark.output` in case :py:attr:`~/.Top10Artists.use_spark` is set and writes the result into its :py:meth:`~.Top10Artists.output` target (a file in local filesystem). """ date_interval = luigi.DateIntervalParameter() - use_hadoop = luigi.BoolParameter() + use_spark = luigi.BoolParameter() def requires(self): """ This task's dependencies: * :py:class:`~.AggregateArtists` or - * :py:class:`~.AggregateArtistsHadoop` if :py:attr:`~/.Top10Artists.use_hadoop` is set. + * :py:class:`~.AggregateArtistsSpark` if :py:attr:`~/.Top10Artists.use_spark` is set. :return: object (:py:class:`luigi.task.Task`) """ - if self.use_hadoop: - return AggregateArtistsHadoop(self.date_interval) + if self.use_spark: + return AggregateArtistsSpark(self.date_interval) else: return AggregateArtists(self.date_interval) @@ -256,7 +250,7 @@ class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable): """ date_interval = luigi.DateIntervalParameter() - use_hadoop = luigi.BoolParameter() + use_spark = luigi.BoolParameter() host = "localhost" database = "toplists" @@ -277,7 +271,7 @@ def requires(self): :return: list of object (:py:class:`luigi.task.Task`) """ - return Top10Artists(self.date_interval, self.use_hadoop) + return Top10Artists(self.date_interval, self.use_spark) if __name__ == "__main__": diff --git a/examples/top_artists_spark.py b/examples/top_artists_spark.py new file mode 100755 index 0000000000..ce0480d3af --- /dev/null +++ b/examples/top_artists_spark.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +import operator +import sys + +from pyspark.sql import SparkSession + + +def main(argv): + input_paths = argv[1].split(',') + output_path = argv[2] + + spark = SparkSession.builder.getOrCreate() + + streams = spark.read.option('sep', '\t').csv(input_paths[0]) + for stream_path in input_paths[1:]: + streams.union(spark.read.option('sep', '\t').csv(stream_path)) + + # The second field is the artist + counts = streams \ + .map(lambda row: (row[1], 1)) \ + .reduceByKey(operator.add) + + counts.write.option('sep', '\t').csv(output_path) + + +if __name__ == '__main__': + sys.exit(main(sys.argv)) diff --git a/luigi/task.py b/luigi/task.py index ccf6f0f63c..d1e589bdff 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -188,8 +188,7 @@ class MyTask(luigi.Task): #: Number of seconds after which to time out the run function. #: No timeout if set to 0. - #: Defaults to 0 or worker-timeout value in config file - #: Only works when using multiple workers. + #: Defaults to 0 or worker-timeout value in config worker_timeout = None #: Maximum number of tasks to run together as a batch. Infinite by default diff --git a/luigi/worker.py b/luigi/worker.py index adac5a03b2..00039b03f6 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -130,9 +130,8 @@ def __init__(self, task, worker_id, result_queue, status_reporter, self.worker_id = worker_id self.result_queue = result_queue self.status_reporter = status_reporter - if task.worker_timeout is not None: - worker_timeout = task.worker_timeout - self.timeout_time = time.time() + worker_timeout if worker_timeout else None + self.worker_timeout = task.worker_timeout or worker_timeout + self.timeout_time = time.time() + self.worker_timeout if self.worker_timeout else None self.use_multiprocessing = use_multiprocessing or self.timeout_time is not None self.check_unfulfilled_deps = check_unfulfilled_deps @@ -1026,7 +1025,7 @@ def _purge_children(self): p.task.trigger_event(Event.PROCESS_FAILURE, p.task, error_msg) elif p.timeout_time is not None and time.time() > float(p.timeout_time) and p.is_alive(): p.terminate() - error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.task.worker_timeout) + error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.worker_timeout) p.task.trigger_event(Event.TIMEOUT, p.task, error_msg) else: continue diff --git a/test/worker_test.py b/test/worker_test.py index cf8d37a77a..67d45b6a1a 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -1863,6 +1863,26 @@ def store_task(t, error_msg): self.assertEqual(result, [task]) + @mock.patch('luigi.worker.time') + def test_timeout_handler_single_worker(self, mock_time): + result = [] + + @HangTheWorkerTask.event_handler(Event.TIMEOUT) + def store_task(t, error_msg): + self.assertTrue(error_msg) + result.append(t) + + w = Worker(wait_interval=0.01, timeout=5) + mock_time.time.return_value = 0 + task = HangTheWorkerTask(worker_timeout=1) + w.add(task) + w._run_task(task.task_id) + + mock_time.time.return_value = 3 + w._handle_next_task() + + self.assertEqual(result, [task]) + class PerTaskRetryPolicyBehaviorTest(LuigiTestCase): def setUp(self):