Pipeline#

class cdiutils.pipeline.Pipeline(params=None, param_file_path=None)[source]#

Bases: ABC

Abstract base class for CDI data processing pipelines.

Provides infrastructure for parameter management, logging, job submission (SLURM), and subprocess execution. Not intended for direct instantiation—subclass for specific applications.

Parameters:
  • params (dict, optional) – parameter dictionary. Defaults to None.

  • param_file_path (str, optional) – path to YAML parameter file. Defaults to None.

Raises:

ValueError – if neither params nor param_file_path is provided.

Logging

File Management

SLURM Integration

__init__(params=None, param_file_path=None)[source]#

Initialise Pipeline with parameters from dict or file.

Parameters:
  • params (dict, optional) – parameter dictionary. Defaults to None.

  • param_file_path (str, optional) – path to YAML parameter file. Defaults to None.

Raises:

ValueError – if neither params nor param_file_path is provided.

make_dump_dir()[source]#

Create output directory specified in params[‘dump_dir’].

Raises:

ValueError – if dump_dir parameter is None.

static process(func)[source]#

Decorate pipeline methods to add logging and error handling.

Wraps process methods with file logging, stdout redirection, and structured error reporting. Creates process-specific log files in dump_dir with format {func_name}_output.log.

Parameters:

func (Callable) – pipeline method to decorate.

Returns:

wrapped function with logging infrastructure.

Return type:

Callable

Raises:

Exception – re-raises any exception from decorated function after logging.

Notes

Temporarily redirects sys.stdout to logger during execution to capture print statements. Original stdout is always restored in finally block.

submit_job(job_file, working_dir)[source]#

Submit SLURM job and return job ID with output file path.

Executes sbatch command in bash login shell to ensure proper environment loading. Sets up keyboard interrupt handler for job cancellation.

Parameters:
  • job_file (str) – path to SLURM batch script.

  • working_dir (str) – directory to execute sbatch from.

Returns:

job ID and absolute path to output file

(slurm-{job_id}.out).

Return type:

tuple[str, str]

Raises:
  • subprocess.CalledProcessError – if sbatch command fails.

  • ValueError – if job ID cannot be extracted from sbatch output.

Notes

Registers SIGINT handler that calls _handle_interrupt with job_id when Ctrl+C is pressed.

is_job_running(job_id)[source]#

Check if SLURM job is currently running.

Queries squeue for job presence. Job is considered running if its ID appears in squeue output.

Parameters:

job_id (str) – SLURM job ID to check.

Returns:

True if job is in queue, False otherwise.

Return type:

bool

Raises:

subprocess.CalledProcessError – if squeue command fails.

stream_job_output(job_id, output_file)[source]#

Stream SLURM job output in real-time.

Waits for output file creation, then continuously reads and logs new lines until job stops running or interrupted flag is set. Logs at JOB level (custom level between INFO and WARNING).

Parameters:
  • job_id (str) – SLURM job ID being monitored.

  • output_file (str) – path to slurm-{job_id}.out file.

Raises:

FileNotFoundError – if output file cannot be accessed after creation.

Notes

Checks file existence every 0.5s until found. Polls running status and reads new lines with 0.5s interval. Respects self.interrupted flag for early termination.

monitor_job(job_id, output_file, retries=10, delay=1)[source]#

Monitor SLURM job and verify final completion status.

Streams job output in real-time and validates final state via sacct after job leaves queue. Retries state check if job shows RUNNING but is not in squeue (handles race conditions).

Parameters:
  • job_id (str) – SLURM job ID to monitor.

  • output_file (str) – path to slurm-{job_id}.out file.

  • retries (int) – number of sacct retries for lingering RUNNING state. Defaults to 10.

  • delay (int) – seconds between retries. Defaults to 1.

Raises:

JobFailedError – if job terminates with FAILED state or non-zero exit code.

Notes

Successfully completed jobs have state=’COMPLETED’ and exit_code=’0:0’. Other terminal states log a warning but do not raise exceptions.

get_job_state(job_id)[source]#

Retrieve SLURM job state and exit code via sacct.

Queries sacct for job status information and parses output to extract state (e.g., COMPLETED, FAILED, RUNNING) and exit code (format: signal:status).

Parameters:

job_id (str) – SLURM job ID to query.

Returns:

job state and exit code (e.g.,

(‘COMPLETED’, ‘0:0’)).

Return type:

tuple[str, str]

Raises:
  • ValueError – if job ID not found in sacct output.

  • subprocess.CalledProcessError – if sacct command fails.

cancel_job(job_id)[source]#

Cancel running SLURM job via scancel.

Parameters:

job_id (str) – SLURM job ID to cancel.

Raises:

subprocess.CalledProcessError – if scancel command fails.

load_parameters(file_path=None)[source]#

Load pipeline parameters from YAML configuration file.

Uses yaml.full_load() to support Python-specific types like tuples that are serialised by yaml.dump().

Parameters:

file_path (str, optional) – path to YAML parameter file. Defaults to None (uses self.param_file_path).

Returns:

loaded parameter dictionary.

Return type:

dict

Raises:
  • FileNotFoundError – if parameter file does not exist.

  • yaml.YAMLError – if file contains invalid YAML.

static pretty_print(text, max_char_per_line=79, do_print=True, return_text=False)[source]#

Format text with decorative star border.

Creates a framed message with star borders and centred text wrapped to specified line width. Useful for logging section headers or important messages.

Parameters:
  • text (str) – text to format.

  • max_char_per_line (int) – maximum line width including border. Defaults to 79.

  • do_print (bool) – whether to print formatted text. Defaults to True.

  • return_text (bool) – whether to return formatted string. Defaults to False.

Returns:

formatted text if return_text=True, else None.

Return type:

None | str

Examples

>>> pretty_print("Hello World", max_char_per_line=30)
******************************
*        Hello World        *
******************************

Notes#

This is an abstract base class. Use BcdiPipeline for BCDI workflows or create your own subclass for custom pipelines.

See Also#

BcdiPipeline : Concrete implementation for BCDI processing