8000 Fix error message in case of unfulfilled dependencies with single output by GianlucaFicarelli · Pull Request #3281 · spotify/luigi · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix error message in case of unfulfilled dependencies with single output #3281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, Config, DynamicRequirements
from luigi.task import Task, Config, DynamicRequirements, flatten
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
Expand Down Expand Up @@ -185,7 +185,7 @@ def run(self):
missing = []
for dep in self.task.deps():
if not self.check_complete(dep):
nonexistent_outputs = [output for output in dep.output() if not output.exists()]
nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()]
if nonexistent_outputs:
missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})')
else:
Expand Down
37 changes: 37 additions & 0 deletions test/worker_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import luigi
import luigi.date_interval
import luigi.notifications
from luigi.mock import MockTarget
from luigi.worker import TaskException, TaskProcess
from luigi.scheduler import DONE, FAILED

Expand Down Expand Up @@ -106,6 +107,42 @@ def complete(self):
None
))

def test_fail_on_unfulfilled_dependencies(self):
class NeverCompleteTask(luigi.Task):
def complete(self):
return False

class A(NeverCompleteTask):
def output(self):
return []

class B(NeverCompleteTask):
def output(self):
return MockTarget("foo-B")

class C(NeverCompleteTask):
def output(self):
return [MockTarget("foo-C1"), MockTarget("foo-C2")]

class Main(NeverCompleteTask):
def requires(self):
return [A(), B(), C()]

task = Main()
result_queue = multiprocessing.Queue()
task_process = TaskProcess(task, 1, result_queue, mock.Mock())

with mock.patch.object(result_queue, 'put') as mock_put:
task_process.run()
expected_missing = [A().task_id, f"{B().task_id} (foo-B)", f"{C().task_id} (foo-C1, foo-C2)"]
mock_put.assert_called_once_with((
task.task_id,
FAILED,
StringContaining(f"Unfulfilled dependencies at run time: {', '.join(expected_missing)}"),
expected_missing,
[],
))

def test_cleanup_children_on_terminate(self):
"""
Subprocesses spawned by tasks should be terminated on terminate
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deps =
pytest<7.0
pytest-cov>=2.0,<3.0
mock<2.0
moto>=1.3.10
moto>=1.3.10,<5.0
HTTPretty==0.8.10
docker>=2.1.0
boto>=2.42,<3.0
Expand Down
0