8000 bugfix/initial-status-issues by bgunnar5 · Pull Request #471 · LLNL/merlin · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

bugfix/initial-status-issues #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7888493
resolve CHANGELOG conflict
bgunnar5 Aug 4, 2023
ec13acf
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Aug 4, 2023
7c59fc2
merge latest changes from develop
bgunnar5 Nov 20, 2023
29573d4
remove a merge conflict statement that was missed
bgunnar5 Nov 20, 2023
0ef09bc
Merge remote-tracking branch 'origin/develop' into develop
bgunnar5 Dec 13, 2023
fb6058c
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Jan 24, 2024
2a6ec3a
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Jan 25, 2024
b22e3a1
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Feb 14, 2024
4b8fab5
add a 'pip freeze' call in github workflow to view reqs versions
bgunnar5 Feb 14, 2024
10000 714dafe
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Feb 15, 2024
d00aff0
Merge remote-tracking branch 'upstream-main/develop' into develop
bgunnar5 Mar 12, 2024
b286c38
fix bug with dry run status
bgunnar5 Mar 20, 2024
e14e209
set MANPAGER for detailed-status
bgunnar5 Mar 20, 2024
29a15bd
fix bug with 1 sample removing the status file
bgunnar5 Mar 20, 2024
05b193a
add support for multiple workers on one step in status files
bgunnar5 Mar 28, 2024
e6cd6f6
update test suite to accommodate changes to workers in status files
bgunnar5 Mar 28, 2024
10ba398
add catch and potential fix for JSONDecodeError
bgunnar5 Apr 8, 2024
d9c975b
fix docstring of a test
bgunnar5 Apr 8, 2024
8ec9900
update CHANGELOG.md
bgunnar5 Apr 8, 2024
c1f040c
run fix style and add Luc's suggestions
bgunnar5 Apr 9, 2024
b37d998
run fix-style with python 3.12
bgunnar5 Apr 9, 2024
19bad1a
added additional check for status file while condensing
bgunnar5 Apr 15, 2024
752bf27
merge changes from develop
bgunnar5 Apr 15, 2024
d687930
add try/except to catch an error for dumping statuses
bgunnar5 Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
### Added
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
- New Priority.RETRY value for the Celery task priorities. This will be the new highest priority.
- Support for the status command to handle multiple workers on the same step
- Documentation on how to run cross-node workflows with a containerized server (`merlin server`)

### Changed
- Modified some tests in `test_status.py` and `test_detailed_status.py` to accommodate bugfixes for the status commands

### Fixed
- Bugfixes for the status commands:
- Fixed "DRY RUN" naming convention so that it outputs in the progress bar properly
- Fixed issue where a step that was run with one sample would delete the status file upon condensing
- Fixed issue where multiple workers processing the same step would break the status file and cause the workflow to crash
- Added a catch for the JSONDecodeError that would potentially crash a run
- Added a FileLock to the status write in `_update_status_file()` of `MerlinStepRecord` to avoid potential race conditions (potentially related to JSONDecodeError above)
- Added in `export MANPAGER="less -r"` call behind the scenes for `detailed-status` to fix ASCII error

## [1.12.0]
### Added
- A new command `merlin queue-info` that will print the status of your celery queues
Expand Down
69 changes: 39 additions & 30 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from merlin.exceptions import HardFailException, InvalidChainException, RestartException, RetryException
from merlin.router import stop_workers
from merlin.spec.expansion import parameter_substitutions_for_cmd, parameter_substitutions_for_sample
from merlin.study.status import read_status
from merlin.utils import dict_deep_merge


Expand Down Expand Up @@ -312,13 +313,17 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914

all_chains.append(new_chain)

condense_sig = condense_status_files.s(
sample_index=sample_index,
workspace=top_lvl_workspace,
condensed_workspace=chain_[0].mstep.condensed_workspace,
).set(
queue=chain_[0].get_task_queue(),
)
# Only need to condense status files if there's more than 1 sample
if num_samples > 1:
condense_sig = condense_status_files.s(
sample_index=sample_index,
workspace=top_lvl_workspace,
condensed_workspace=chain_[0].mstep.condensed_workspace,
).set(
queue=chain_[0].get_task_queue(),
)
else:
condense_sig = None

LOG.debug("adding chain to chord")
chain_1d = get_1d_chain(all_chains)
Expand Down Expand Up @@ -467,29 +472,33 @@ def gather_statuses(
# Read in the status data
sample_workspace = f"{workspace}/{path}"
status_filepath = f"{sample_workspace}/MERLIN_STATUS.json"
lock = FileLock(f"{sample_workspace}/status.lock") # pylint: disable=E0110
try:
# The status files will need locks when reading to avoid race conditions
with lock.acquire(timeout=10):
with open(status_filepath, "r") as status_file:
status = json.load(status_file)

# This for loop is just to get the step name that we don't have; it's really not even looping
for step_name in status:
try:
# Make sure the status for this sample workspace is in a finished state (not initialized or running)
if status[step_name][f"{condensed_workspace}/{path}"]["status"] not in ("INITIALIZED", "RUNNING"):
# Add the status data to the statuses we'll write to the condensed file and remove this status file
dict_deep_merge(condensed_statuses, status)
files_to_remove.append(status_filepath)
except KeyError:
LOG.warning(f"Key error when reading from {sample_workspace}")
except Timeout:
# Raising this celery timeout instead will trigger a restart for this task
raise TimeoutError # pylint: disable=W0707
except FileNotFoundError:
LOG.warning(f"Could not find {status_filepath} while trying to condense. Restarting this task...")
raise FileNotFoundError # pylint: disable=W0707
lock_filepath = f"{sample_workspace}/status.lock"
if os.path.exists(status_filepath):
try:
# NOTE: instead of leaving statuses as dicts read in by JSON, maybe they should each be their own object
status = read_status(status_filepath, lock_filepath, raise_errors=True)

# This for loop is just to get the step name that we don't have; it's really not even looping
for step_name in status:
try:
# Make sure the status for this sample workspace is in a finished state (not initialized or running)
if status[step_name][f"{condensed_workspace}/{path}"]["status"] not in ("INITIALIZED", "RUNNING"):
# Add the status data to the statuses we'll write to the condensed file and remove this status file
dict_deep_merge(condensed_statuses, status)
files_to_remove.append(status_filepath)
files_to_remove.append(lock_filepath) # Remove the lock files as well as the status files
except KeyError:
LOG.warning(f"Key error when reading from {sample_workspace}")
except Timeout:
# Raising this celery timeout instead will trigger a restart for this task
raise TimeoutError # pylint: disable=W0707
except FileNotFoundError:
LOG.warning(f"Could not find {status_filepath} while trying to condense. Restarting this task...")
raise FileNotFoundError # pylint: disable=W0707
else:
# Might be missing a status file in the output if we hit this but we don't want that
# to fully crash the workflow
LOG.debug(f"Could not find {status_filepath}, skipping this status file.")

return condensed_statuses

Expand Down
11 changes: 6 additions & 5 deletions merlin/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def display_status_summary( # pylint: disable=R0912

:param `status_obj`: A Status object
:param `non_workspace_keys`: A set of keys in requested_statuses that are not workspace keys.
This will be set("parameters", "task_queue", "worker_name)
This will be set("parameters", "task_queue", "workers")
:param `test_mode`: If True, don't print anything and just return a dict of all the state info for each step
:returns: A dict that's empty usually. If ran in test_mode it will be a dict of state_info for every step.
"""
Expand All @@ -369,7 +369,7 @@ def display_status_summary( # pylint: disable=R0912
"UNKNOWN": {"count": 0, "color": ANSI_COLORS["GREY"], "fill": "?"},
"INITIALIZED": {"count": 0, "color": ANSI_COLORS["LIGHT_BLUE"]},
"RUNNING": {"count": 0, "color": ANSI_COLORS["BLUE"]},
"DRY RUN": {"count": 0, "color": ANSI_COLORS["ORANGE"], "fill": "\\"},
"DRY_RUN": {"count": 0, "color": ANSI_COLORS["ORANGE"], "fill": "\\"},
"TOTAL TASKS": {"total": status_obj.tasks_per_step[sstep]},
"AVG RUN TIME": status_obj.run_time_info[sstep]["avg_run_time"],
"RUN TIME STD DEV": status_obj.run_time_info[sstep]["run_time_std_dev"],
Expand All @@ -385,8 +385,9 @@ def display_status_summary( # pylint: disable=R0912
# If this was a non-local run we should have a task queue and worker name to add to state_info
if "task_queue" in overall_step_info:
state_info["TASK QUEUE"] = {"name": overall_step_info["task_queue"]}
if "worker_name" in overall_step_info:
state_info["WORKER NAME"] = {"name": overall_step_info["worker_name"]}
if "workers" in overall_step_info:
worker_str = ", ".join(overall_step_info["workers"])
state_info["WORKER(S)"] = {"name": worker_str}

# Loop through all workspaces for this step (if there's no samples for this step it'll just be one path)
for sub_step_workspace, task_status_info in overall_step_info.items():
Expand Down Expand Up @@ -474,7 +475,7 @@ def display_progress_bar( # pylint: disable=R0913,R0914
"INITIALIZED",
"RUNNING",
"TASK QUEUE",
"WORKER NAME",
"WORKER(S)",
"TOTAL TASKS",
"AVG RUN TIME",
"RUN TIME STD DEV",
Expand Down
Loading
0