From cb71b10231ae7c1aa0ee3efa56fe4e29bfcf5963 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Thu, 16 May 2019 11:33:01 -0400 Subject: [PATCH 1/8] Mark tasks as failed if complete() is false when run finishes --- luigi/worker.py | 22 +++++++++++++++------- test/helpers.py | 7 +++++++ test/worker_task_test.py | 22 +++++++++++++++++++++- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index 8a00fdb246..3425def5ce 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -186,8 +186,6 @@ def run(self): if _is_external(self.task): # External task - # TODO(erikbern): We should check for task completeness after non-external tasks too! - # This will resolve #814 and make things a lot more consistent if self.task.complete(): status = DONE else: @@ -197,7 +195,15 @@ def run(self): else: with self._forward_attributes(): new_deps = self._run_get_new_deps() - status = DONE if not new_deps else PENDING + if not new_deps: + if self.complete(): + status = DONE + else: + status = FAILED + ex = TaskException("Task finished running, but complete() is still returning false.") + expl = self._handle_run_exception(ex) + else: + status = PENDING if new_deps: logger.info( @@ -215,15 +221,17 @@ def run(self): raise except BaseException as ex: status = FAILED - logger.exception("[pid %s] Worker %s failed %s", os.getpid(), self.worker_id, self.task) - self.task.trigger_event(Event.FAILURE, self.task, ex) - raw_error_message = self.task.on_failure(ex) - expl = raw_error_message + expl = self._handle_run_exception(ex) finally: self.result_queue.put( (self.task.task_id, status, expl, missing, new_deps)) + def _handle_run_exception(self, ex): + logger.exception("[pid %s] Worker %s failed %s", os.getpid(), self.worker_id, self.task) + self.task.trigger_event(Event.FAILURE, self.task, ex) + return self.task.on_failure(ex) + def _recursive_terminate(self): import psutil diff --git a/test/helpers.py b/test/helpers.py index c407835e30..64911a06e4 100644 --- a/test/helpers.py +++ b/test/helpers.py @@ -138,6 +138,13 @@ def run(self): self.comp = True +# string subclass that matches arguments containing the specified substring +# for use in mock 'called_with' assertions +class StringContaining(str): + def __eq__(self, other_str): + return self in other_str + + class LuigiTestCase(unittest.TestCase): """ Tasks registred within a test case will get unregistered in a finalizer diff --git a/test/worker_task_test.py b/test/worker_task_test.py index c0355fb9e9..ce4d17d387 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -18,7 +18,7 @@ from subprocess import check_call import sys -from helpers import LuigiTestCase +from helpers import LuigiTestCase, StringContaining import mock from psutil import Process from time import sleep @@ -87,6 +87,26 @@ def on_failure(self, exception): task_process.run() mock_put.assert_called_once_with((task.task_id, FAILED, "test failure expl", [], [])) + def test_fail_on_false_complete(self): + class NeverCompleteTask(luigi.Task): + def complete(self): + return False + + task = NeverCompleteTask() + result_queue = multiprocessing.Queue() + task_process = TaskProcess(task, 1, result_queue, mock.Mock()) + + with mock.patch.object(result_queue, 'put') as mock_put: + task_process.run() + mock_put.assert_called_once_with(( + task.task_id, + FAILED, + StringContaining("finished running, but complete() is still returning false"), + [], + [] + )) + + def test_cleanup_children_on_terminate(self): """ Subprocesses spawned by tasks should be terminated on terminate From 34ab31956ea8889d7a01de72efa20ba564d81992 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Thu, 16 May 2019 11:39:33 -0400 Subject: [PATCH 2/8] linting --- test/worker_task_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/worker_task_test.py b/test/worker_task_test.py index ce4d17d387..02af4c616c 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -106,7 +106,6 @@ def complete(self): [] )) - def test_cleanup_children_on_terminate(self): """ Subprocesses spawned by tasks should be terminated on terminate From 2b57fef8360d78bd21e833cc584ec9d9e78000b9 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Thu, 16 May 2019 11:54:51 -0400 Subject: [PATCH 3/8] properly reference child task --- luigi/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/worker.py b/luigi/worker.py index 3425def5ce..1c17d78d51 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -196,7 +196,7 @@ def run(self): with self._forward_attributes(): new_deps = self._run_get_new_deps() if not new_deps: - if self.complete(): + if self.task.complete(): status = DONE else: status = FAILED From 6ef563e3fc6ec219c18554270e4cdca5b72f44d4 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 20 May 2019 10:47:56 -0400 Subject: [PATCH 4/8] Add check_complete_on_run setting to gate new feature --- luigi/worker.py | 12 ++++++++++-- test/worker_task_test.py | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index 1c17d78d51..0d11da43f9 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -124,7 +124,8 @@ class TaskProcess(multiprocessing.Process): } def __init__(self, task, worker_id, result_queue, status_reporter, - use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True): + use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True, + check_complete_on_run=False): super(TaskProcess, self).__init__() self.task = task self.worker_id = worker_id @@ -134,6 +135,7 @@ def __init__(self, task, worker_id, result_queue, status_reporter, self.timeout_time = time.time() + self.worker_timeout if self.worker_timeout else None self.use_multiprocessing = use_multiprocessing or self.timeout_time is not None self.check_unfulfilled_deps = check_unfulfilled_deps + self.check_complete_on_run = check_complete_on_run def _run_get_new_deps(self): task_gen = self.task.run() @@ -196,7 +198,7 @@ def run(self): with self._forward_attributes(): new_deps = self._run_get_new_deps() if not new_deps: - if self.task.complete(): + if not self.check_complete_on_run or self.task.complete(): status = DONE else: status = FAILED @@ -455,6 +457,11 @@ class worker(Config): check_unfulfilled_deps = BoolParameter(default=True, description='If true, check for completeness of ' 'dependencies before running a task') + check_complete_on_run = BoolParameter(default=False, + config_path=dict(section='core', name='check-complete-on-run'), + description='If true, only mark tasks as done after running if they are complete. ' + 'Regardless of this setting, the worker will always check if external ' + 'tasks are complete before marking them as done.') force_multiprocessing = BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker') @@ -1024,6 +1031,7 @@ def _create_task_process(self, task): use_multiprocessing=use_multiprocessing, worker_timeout=self._config.timeout, check_unfulfilled_deps=self._config.check_unfulfilled_deps, + check_complete_on_run=self._config.check_complete_on_run, ) def _purge_children(self): diff --git a/test/worker_task_test.py b/test/worker_task_test.py index 02af4c616c..ebccb345b6 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -94,7 +94,7 @@ def complete(self): task = NeverCompleteTask() result_queue = multiprocessing.Queue() - task_process = TaskProcess(task, 1, result_queue, mock.Mock()) + task_process = TaskProcess(task, 1, result_queue, mock.Mock(), check_complete_on_run=True) with mock.patch.object(result_queue, 'put') as mock_put: task_process.run() From 4370cd75040d73299876de4a6a5742bf02a1a7a0 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 20 May 2019 10:50:37 -0400 Subject: [PATCH 5/8] lint roller --- luigi/worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index 0d11da43f9..a8b223b7a3 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -458,10 +458,10 @@ class worker(Config): description='If true, check for completeness of ' 'dependencies before running a task') check_complete_on_run = BoolParameter(default=False, - config_path=dict(section='core', name='check-complete-on-run'), - description='If true, only mark tasks as done after running if they are complete. ' - 'Regardless of this setting, the worker will always check if external ' - 'tasks are complete before marking them as done.') + config_path=dict(section='core', name='check-complete-on-run'), + description='If true, only mark tasks as done after running if they are complete. ' + 'Regardless of this setting, the worker will always check if external ' + 'tasks are complete before marking them as done.') force_multiprocessing = BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker') From ee5e9c5d6efa83a1b89a733e3e2dc8bbbfc6ea8a Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 20 May 2019 11:47:15 -0400 Subject: [PATCH 6/8] fix use of misleading call signature --- luigi/worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/luigi/worker.py b/luigi/worker.py index a8b223b7a3..a7ae96c332 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -201,9 +201,7 @@ def run(self): if not self.check_complete_on_run or self.task.complete(): status = DONE else: - status = FAILED - ex = TaskException("Task finished running, but complete() is still returning false.") - expl = self._handle_run_exception(ex) + raise TaskException("Task finished running, but complete() is still returning false.") else: status = PENDING From d326c6fe2ce3e8acb11dc0a3d171f2021531f1c2 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 20 May 2019 12:03:24 -0400 Subject: [PATCH 7/8] last param fix --- test/worker_task_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/worker_task_test.py b/test/worker_task_test.py index ebccb345b6..5196210296 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -103,7 +103,7 @@ def complete(self): FAILED, StringContaining("finished running, but complete() is still returning false"), [], - [] + None )) def test_cleanup_children_on_terminate(self): From a64cd526e2ca3efa2f046c3f222087ae57f3ee4e Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Tue, 21 May 2019 15:28:20 -0400 Subject: [PATCH 8/8] remove config_path param --- luigi/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/luigi/worker.py b/luigi/worker.py index a7ae96c332..b75595368e 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -456,7 +456,6 @@ class worker(Config): description='If true, check for completeness of ' 'dependencies before running a task') check_complete_on_run = BoolParameter(default=False, - config_path=dict(section='core', name='check-complete-on-run'), description='If true, only mark tasks as done after running if they are complete. ' 'Regardless of this setting, the worker will always check if external ' 'tasks are complete before marking them as done.')