From 6943f025cf57b95106e202f267ca3939c7267d02 Mon Sep 17 00:00:00 2001 From: Mateusz Adamczyk Date: Fri, 14 Feb 2025 16:44:20 +0100 Subject: [PATCH 1/2] Make reporting more metrics for tasks possible --- luigi/metrics.py | 3 +++ luigi/scheduler.py | 6 ++++++ luigi/worker.py | 3 +++ 3 files changed, 12 insertions(+) diff --git a/luigi/metrics.py b/luigi/metrics.py index cd3364e251..30d45164f4 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -66,6 +66,9 @@ def handle_task_disabled(self, task, config): def handle_task_done(self, task): pass + def handle_task_statistics(self, task, statistics): + pass + def generate_latest(self): return diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 81532d3c96..9aca3c90cd 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -1646,3 +1646,9 @@ def task_history(self): @rpc_method() def update_metrics_task_started(self, task): self._state._metrics_collector.handle_task_started(task) + + @rpc_method() + def report_task_statistics(self, task_id, statistics): + if self._state.has_task(task_id): + task = self._state.get_task(task_id) + self._state._metrics_collector.handle_task_statistics(task, statistics) diff --git a/luigi/worker.py b/luigi/worker.py index a11f808ae5..43e5dc0c0f 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -339,6 +339,9 @@ def update_progress_percentage(self, percentage): def decrease_running_resources(self, decrease_resources): self._scheduler.decrease_running_task_resources(self._task_id, decrease_resources) + def report_task_statistics(self, statistics): + self._scheduler.report_task_statistics(self._task_id, statistics) + class SchedulerMessage: """ From df582477ee364c7760b2f96230a0ddb68a272c04 Mon Sep 17 00:00:00 2001 From: Mateusz Adamczyk Date: Sat, 26 Apr 2025 17:56:26 +0200 Subject: [PATCH 2/2] Added documentation and test for collecting custom tasks' metrics --- doc/configuration.rst | 4 ++ doc/luigi_patterns.rst | 78 ++++++++++++++++++++++++-- test/custom_metrics_test.py | 106 ++++++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 5 deletions(-) create mode 100644 test/custom_metrics_test.py diff --git a/doc/configuration.rst b/doc/configuration.rst index ad085a3439..6edfaa8f48 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -271,6 +271,10 @@ task_limit thus far. Prevents incidents due to spamming of the scheduler, usually accidental. Default: no limit. +task_process_context + An optional setting allowing Luigi to import a custom context manager + used to wrap the execution of tasks' run methods. Default: no context manager. + timeout .. versionadded:: 1.0.20 diff --git a/doc/luigi_patterns.rst b/doc/luigi_patterns.rst index e59d1a4b47..7bdb560c7e 100644 --- a/doc/luigi_patterns.rst +++ b/doc/luigi_patterns.rst @@ -206,12 +206,12 @@ available while others are running. Avoiding concurrent writes to a single file ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Updating a single file from several tasks is almost always a bad idea, and you +Updating a single file from several tasks is almost always a bad idea, and you need to be very confident that no other good solution exists before doing this. If, however, you have no other option, then you will probably at least need to ensure that -no two tasks try to write to the file _simultaneously_. +no two tasks try to write to the file _simultaneously_. -By turning 'resources' into a Python property, it can return a value dependent on +By turning 'resources' into a Python property, it can return a value dependent on the task parameters or other dynamic attributes: .. code-block:: python @@ -223,7 +223,7 @@ the task parameters or other dynamic attributes: def resources(self): return { self.important_file_name: 1 } -Since, by default, resources have a usage limit of 1, no two instances of Task A +Since, by default, resources have a usage limit of 1, no two instances of Task A will now run if they have the same `important_file_name` property. Decreasing resources of running tasks @@ -321,7 +321,7 @@ atomic. Sending messages to tasks ~~~~~~~~~~~~~~~~~~~~~~~~~ -The central scheduler is able to send messages to particular tasks. When a running task accepts +The central scheduler is able to send messages to particular tasks. When a running task accepts messages, it can access a `multiprocessing.Queue `__ object storing incoming messages. You can implement custom behavior to react and respond to messages: @@ -353,3 +353,71 @@ messages: Messages can be sent right from the scheduler UI which also displays responses (if any). Note that this feature is only available when the scheduler is configured to send messages (see the :ref:`scheduler-config` config), and the task is configured to accept them. + +Gathering custom metrics from tasks' executions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The central scheduler is able to gather custom metrics from tasks' executions with help of +custom metrics collector (see the :ref:`scheduler-config` config). To obtain custom metrics, +you need to implement: + +#. Custom metrics collector class inheriting from + :class:`~luigi.metrics.MetricsCollector` (or derived) and implementing the + :meth:`~luigi.metrics.MetricsCollector.handle_task_statistics` + method (default one does nothing). This method will be called for each task + that has been executed everytime, when + :meth:`~luigi.worker.TaskStatusReporter.report_task_statistics` is called. + For instance, following metrics collector adds monitoring tasks' execution + time and memory usage: + + .. code-block:: python + + class MetricsCollector(PrometheusMetricsCollector): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.task_run_execution_time = Gauge( + 'luigi_task_run_execution_time_seconds', + 'luigi task run method execution time in seconds', + self.labels, + registry=self.registry + ) + self.task_execution_memory = Gauge( + 'luigi_task_max_memory_megabytes', + 'luigi task run method max memory usage in megabytes', + self.labels, + registry=self.registry + ) + + def handle_task_statistics(self, task, statistics): + if "elapsed" in statistics: + self.task_run_execution_time.labels(**self._generate_task_labels(task)).set(statistics["elapsed"]) + if "memory" in statistics: + self.task_execution_memory.labels(**self._generate_task_labels(task)).set(statistics["memory"]) + +#. Custom task context manager (see the :ref:`worker-config` config), + which in `__exit__` method would call + :meth:`~luigi.worker.TaskStatusReporter.report_task_statistics` method with + the statistics dictionary. For instance, following task context manager collects + task execution time and memory usage: + + .. code-block:: python + + class TaskContext: + def __init__(self, task_process): + self._task_process = task_process + self._start = None + + def __enter__(self): + self._start = time.perf_counter() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + assert self._start is not None + elapsed = time.perf_counter() - self._start + used_memory = max( + resource.getrusage(resource.RUSAGE_SELF).ru_maxrss, resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss + ) + logging.getLogger("luigi-interface").info( + f'Task {self._task_process.task}: time: {elapsed:.2f}s, memory: {used_memory / 1024:.2f}MB ' + ) + self._task_process.status_reporter.report_task_statistics({"memory": used_memory / 1024, "elapsed": elapsed}) diff --git a/test/custom_metrics_test.py b/test/custom_metrics_test.py new file mode 100644 index 0000000000..3d5783788e --- /dev/null +++ b/test/custom_metrics_test.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2012-2017 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import tempfile +import time + +from helpers import LuigiTestCase, temporary_unloaded_module + +import luigi +from luigi.metrics import MetricsCollectors +from luigi.scheduler import Scheduler +from luigi.worker import Worker + + +class CustomMetricsTestMyTask(luigi.Task): + root_path = luigi.PathParameter() + + n = luigi.IntParameter() + + def output(self): + basename = "%s_%s.txt" % (self.__class__.__name__, self.n) + return luigi.LocalTarget(os.path.join(self.root_path, basename)) + + def run(self): + time.sleep(self.n) + with self.output().open('w') as f: + f.write("content\n") + + +class CustomMetricsTestWrapper(CustomMetricsTestMyTask): + def requires(self): + return [self.clone(CustomMetricsTestMyTask, n=n) for n in range(self.n)] + + +METRICS_COLLECTOR_MODULE = b''' +from luigi.metrics import NoMetricsCollector + +class CustomMetricsCollector(NoMetricsCollector): + def __init__(self, *args, **kwargs): + super(CustomMetricsCollector, self).__init__(*args, **kwargs) + self.elapsed = {} + + def handle_task_statistics(self, task, statistics): + if "elapsed" in statistics: + self.elapsed[(task.family, task.params.get("n"))] = statistics["elapsed"] +''' + + +TASK_CONTEXT_MODULE = b''' +import time + +class CustomTaskContext: + def __init__(self, task_process): + self._task_process = task_process + self._start = None + + def __enter__(self): + self._start = time.perf_counter() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + assert self._start is not None + elapsed = time.perf_counter() - self._start + self._task_process.status_reporter.report_task_statistics({"elapsed": elapsed}) +''' + + +class CustomMetricsTest(LuigiTestCase): + """ + Test showcasing collection of cutom metrics + """ + + def _run_task_on_worker(self, worker): + with tempfile.TemporaryDirectory() as tmpdir: + task = CustomMetricsTestWrapper(n=3, root_path=tmpdir) + self.assertTrue(worker.add(task)) + worker.run() + self.assertTrue(task.complete()) + + def _create_worker_and_run_task(self, scheduler): + with temporary_unloaded_module(TASK_CONTEXT_MODULE) as task_context_module: + with Worker(scheduler=scheduler, worker_id='X', task_process_context=task_context_module + '.CustomTaskContext') as worker: + self._run_task_on_worker(worker) + + def test_custom_metrics(self): + with temporary_unloaded_module(METRICS_COLLECTOR_MODULE) as metrics_collector_module: + scheduler = Scheduler(metrics_collector=MetricsCollectors.custom, metrics_custom_import=metrics_collector_module + '.CustomMetricsCollector') + self._create_worker_and_run_task(scheduler) + for (family, n), elapsed in scheduler._state._metrics_collector.elapsed.items(): + self.assertTrue(family in {'CustomMetricsTestMyTask', 'CustomMetricsTestWrapper'}) + self.assertTrue(elapsed >= float(n))