From ce3e7b596a35ea6740fafcd5ff0d0c8ff9568eba Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 20 Jul 2020 13:23:59 -0400 Subject: [PATCH 1/2] Close our link to the task result queue when a worker exits --- luigi/worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/luigi/worker.py b/luigi/worker.py index 6c6091cbfb..ba575b7fdf 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -606,6 +606,7 @@ def __exit__(self, type, value, traceback): for task in self._running_tasks.values(): if task.is_alive(): task.terminate() + self._task_result_queue.close() return False # Don't suppress exception def _generate_worker_info(self): From daee4767c212539dece8b0ea4768c644e8b04ae3 Mon Sep 17 00:00:00 2001 From: Steve Pletcher Date: Mon, 10 Aug 2020 11:26:22 -0400 Subject: [PATCH 2/2] Don't exit worker context until worker has run in test --- test/worker_external_task_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/worker_external_task_test.py b/test/worker_external_task_test.py index d2030c63fe..ed8a3581af 100644 --- a/test/worker_external_task_test.py +++ b/test/worker_external_task_test.py @@ -185,9 +185,9 @@ def test_external_task_complete_but_missing_dep_at_runtime(self): # split up scheduling task and running to simulate runtime scenario with self._make_worker() as w: w.add(test_task) - # touch output so test_task should be considered complete at runtime - open(test_task.output_path, 'a').close() - success = w.run() + # touch output so test_task should be considered complete at runtime + open(test_task.output_path, 'a').close() + success = w.run() self.assertTrue(success) # upstream dependency output didn't exist at runtime