From 5183a3dbf875ad5bc3dbc51796a6e62b22d21f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 14 Feb 2026 12:17:27 +0100 Subject: [PATCH 1/4] [Fix] Only add processes which exist in process_dict --- src/executorlib/task_scheduler/file/shared.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 7c3183b9..152cd4a6 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -5,7 +5,7 @@ from typing import Any, Callable, Optional from executorlib.standalone.command import get_cache_execute_command -from executorlib.standalone.hdf import get_cache_files, get_output +from executorlib.standalone.hdf import get_cache_files, get_output, get_queue_id from executorlib.standalone.serialize import serialize_funct from executorlib.task_scheduler.file.spawner_subprocess import terminate_subprocess @@ -152,7 +152,7 @@ def execute_tasks_h5( file_name = os.path.join(cache_directory, task_key + "_i.h5") if not disable_dependencies: task_dependent_lst = [ - process_dict[k] for k in future_wait_key_lst + process_dict[k] for k in future_wait_key_lst if k in process_dict ] else: if len(future_wait_key_lst) > 0: @@ -181,9 +181,13 @@ def execute_tasks_h5( backend=backend, cache_directory=cache_directory, ) - file_name_dict[task_key] = os.path.join( + file_name = os.path.join( cache_directory, task_key + "_o.h5" ) + file_name_dict[task_key] = file_name + queue_id = get_queue_id(file_name=file_name) + if queue_id is not None: + process_dict[task_key] = queue_id memory_dict[task_key] = task_dict["future"] cache_dir_dict[task_key] = cache_directory future_queue.task_done() From 5d142e5cd55507c6eead5e0947d2db35f3f1c023 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 14 Feb 2026 11:18:16 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/file/shared.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 152cd4a6..683acd5a 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -152,7 +152,9 @@ def execute_tasks_h5( file_name = os.path.join(cache_directory, task_key + "_i.h5") if not disable_dependencies: task_dependent_lst = [ - process_dict[k] for k in future_wait_key_lst if k in process_dict + process_dict[k] + for k in future_wait_key_lst + if k in process_dict ] else: if len(future_wait_key_lst) > 0: @@ -181,9 +183,7 @@ def execute_tasks_h5( backend=backend, cache_directory=cache_directory, ) - file_name = os.path.join( - cache_directory, task_key + "_o.h5" - ) + file_name = os.path.join(cache_directory, task_key + "_o.h5") file_name_dict[task_key] = file_name queue_id = get_queue_id(file_name=file_name) if queue_id is not None: From 0bc8c96a05cc7df353c9b9c5cf3a3a7c584fdf83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 14 Feb 2026 12:27:10 +0100 Subject: [PATCH 3/4] reduce number of statements --- src/executorlib/task_scheduler/file/shared.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 683acd5a..45f3f7f9 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -130,13 +130,10 @@ def execute_tasks_h5( memory_dict=memory_dict, file_name_dict=file_name_dict, ) - task_resource_dict = task_dict["resource_dict"].copy() - task_resource_dict.update( - {k: v for k, v in resource_dict.items() if k not in task_resource_dict} + task_resource_dict, cache_key, cache_directory, error_log_file = _get_task_input( + task_resource_dict=task_dict["resource_dict"].copy(), + resource_dict=resource_dict ) - cache_key = task_resource_dict.pop("cache_key", None) - cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) - error_log_file = task_resource_dict.pop("error_log_file", None) task_key, data_dict = serialize_funct( fn=task_dict["fn"], fn_args=task_args, @@ -358,3 +355,13 @@ def _cancel_processes( config_directory=pysqa_config_directory, backend=backend, ) + + +def _get_task_input(task_resource_dict: dict, resource_dict: dict) -> tuple[dict, Optional[str], str, Optional[str]]: + task_resource_dict.update( + {k: v for k, v in resource_dict.items() if k not in task_resource_dict} + ) + cache_key = task_resource_dict.pop("cache_key", None) + cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) + error_log_file = task_resource_dict.pop("error_log_file", None) + return task_resource_dict, cache_key, cache_directory, error_log_file From 1c1e0583f74f170e7750ede9b60a147318df91ed Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 14 Feb 2026 11:27:20 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/file/shared.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 45f3f7f9..491f34c9 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -130,9 +130,11 @@ def execute_tasks_h5( memory_dict=memory_dict, file_name_dict=file_name_dict, ) - task_resource_dict, cache_key, cache_directory, error_log_file = _get_task_input( - task_resource_dict=task_dict["resource_dict"].copy(), - resource_dict=resource_dict + task_resource_dict, cache_key, cache_directory, error_log_file = ( + _get_task_input( + task_resource_dict=task_dict["resource_dict"].copy(), + resource_dict=resource_dict, + ) ) task_key, data_dict = serialize_funct( fn=task_dict["fn"], @@ -357,7 +359,9 @@ def _cancel_processes( ) -def _get_task_input(task_resource_dict: dict, resource_dict: dict) -> tuple[dict, Optional[str], str, Optional[str]]: +def _get_task_input( + task_resource_dict: dict, resource_dict: dict +) -> tuple[dict, Optional[str], str, Optional[str]]: task_resource_dict.update( {k: v for k, v in resource_dict.items() if k not in task_resource_dict} )