Pipeline Module#
Automated BCDI processing workflows.
Main Pipeline Classes#
- class cdiutils.pipeline.base.LoggerWriter(logger, level, wrap=True)[source]#
Bases:
objectCustom stream redirecting stdout to logger in real-time.
Captures print statements and routes them through the logging system with optional line wrapping at 79 characters.
- Parameters:
logger (logging.Logger) – target logger instance.
level (int) – logging level (e.g., logging.INFO).
wrap (bool) – enable line wrapping at 79 chars. Defaults to True.
- exception cdiutils.pipeline.base.JobCancelledError[source]#
Bases:
ExceptionException raised when user cancels a SLURM job.
Triggered by keyboard interrupts (Ctrl+C) during job monitoring.
- exception cdiutils.pipeline.base.JobFailedError[source]#
Bases:
ExceptionException raised when a SLURM job fails.
Indicates non-zero exit codes or failed job states detected via sacct.
- class cdiutils.pipeline.base.Pipeline(params=None, param_file_path=None)[source]#
Bases:
ABCAbstract base class for CDI data processing pipelines.
Provides infrastructure for parameter management, logging, job submission (SLURM), and subprocess execution. Not intended for direct instantiation—subclass for specific applications.
- Parameters:
params (dict, optional) – parameter dictionary. Defaults to None.
param_file_path (str, optional) – path to YAML parameter file. Defaults to None.
- Raises:
ValueError – if neither params nor param_file_path is provided.
- __init__(params=None, param_file_path=None)[source]#
Initialise Pipeline with parameters from dict or file.
- Parameters:
params (dict, optional) – parameter dictionary. Defaults to None.
param_file_path (str, optional) – path to YAML parameter file. Defaults to None.
- Raises:
ValueError – if neither params nor param_file_path is provided.
- make_dump_dir()[source]#
Create output directory specified in params[‘dump_dir’].
- Raises:
ValueError – if dump_dir parameter is None.
- static process(func)[source]#
Decorate pipeline methods to add logging and error handling.
Wraps process methods with file logging, stdout redirection, and structured error reporting. Creates process-specific log files in dump_dir with format {func_name}_output.log.
- Parameters:
func (Callable) – pipeline method to decorate.
- Returns:
wrapped function with logging infrastructure.
- Return type:
Callable
- Raises:
Exception – re-raises any exception from decorated function after logging.
Notes
Temporarily redirects sys.stdout to logger during execution to capture print statements. Original stdout is always restored in finally block.
- submit_job(job_file, working_dir)[source]#
Submit SLURM job and return job ID with output file path.
Executes sbatch command in bash login shell to ensure proper environment loading. Sets up keyboard interrupt handler for job cancellation.
- Parameters:
job_file (str) – path to SLURM batch script.
working_dir (str) – directory to execute sbatch from.
- Returns:
- job ID and absolute path to output file
(slurm-{job_id}.out).
- Return type:
tuple[str, str]
- Raises:
subprocess.CalledProcessError – if sbatch command fails.
ValueError – if job ID cannot be extracted from sbatch output.
Notes
Registers SIGINT handler that calls _handle_interrupt with job_id when Ctrl+C is pressed.
- is_job_running(job_id)[source]#
Check if SLURM job is currently running.
Queries squeue for job presence. Job is considered running if its ID appears in squeue output.
- Parameters:
job_id (str) – SLURM job ID to check.
- Returns:
True if job is in queue, False otherwise.
- Return type:
bool
- Raises:
subprocess.CalledProcessError – if squeue command fails.
- stream_job_output(job_id, output_file)[source]#
Stream SLURM job output in real-time.
Waits for output file creation, then continuously reads and logs new lines until job stops running or interrupted flag is set. Logs at JOB level (custom level between INFO and WARNING).
- Parameters:
job_id (str) – SLURM job ID being monitored.
output_file (str) – path to slurm-{job_id}.out file.
- Raises:
FileNotFoundError – if output file cannot be accessed after creation.
Notes
Checks file existence every 0.5s until found. Polls running status and reads new lines with 0.5s interval. Respects self.interrupted flag for early termination.
- monitor_job(job_id, output_file, retries=10, delay=1)[source]#
Monitor SLURM job and verify final completion status.
Streams job output in real-time and validates final state via sacct after job leaves queue. Retries state check if job shows RUNNING but is not in squeue (handles race conditions).
- Parameters:
job_id (str) – SLURM job ID to monitor.
output_file (str) – path to slurm-{job_id}.out file.
retries (int) – number of sacct retries for lingering RUNNING state. Defaults to 10.
delay (int) – seconds between retries. Defaults to 1.
- Raises:
JobFailedError – if job terminates with FAILED state or non-zero exit code.
Notes
Successfully completed jobs have state=’COMPLETED’ and exit_code=’0:0’. Other terminal states log a warning but do not raise exceptions.
- get_job_state(job_id)[source]#
Retrieve SLURM job state and exit code via sacct.
Queries sacct for job status information and parses output to extract state (e.g., COMPLETED, FAILED, RUNNING) and exit code (format: signal:status).
- Parameters:
job_id (str) – SLURM job ID to query.
- Returns:
- job state and exit code (e.g.,
(‘COMPLETED’, ‘0:0’)).
- Return type:
tuple[str, str]
- Raises:
ValueError – if job ID not found in sacct output.
subprocess.CalledProcessError – if sacct command fails.
- cancel_job(job_id)[source]#
Cancel running SLURM job via scancel.
- Parameters:
job_id (str) – SLURM job ID to cancel.
- Raises:
subprocess.CalledProcessError – if scancel command fails.
- load_parameters(file_path=None)[source]#
Load pipeline parameters from YAML configuration file.
Uses yaml.full_load() to support Python-specific types like tuples that are serialised by yaml.dump().
- Parameters:
file_path (str, optional) – path to YAML parameter file. Defaults to None (uses self.param_file_path).
- Returns:
loaded parameter dictionary.
- Return type:
dict
- Raises:
FileNotFoundError – if parameter file does not exist.
yaml.YAMLError – if file contains invalid YAML.
- static pretty_print(text, max_char_per_line=79, do_print=True, return_text=False)[source]#
Format text with decorative star border.
Creates a framed message with star borders and centred text wrapped to specified line width. Useful for logging section headers or important messages.
- Parameters:
text (str) – text to format.
max_char_per_line (int) – maximum line width including border. Defaults to 79.
do_print (bool) – whether to print formatted text. Defaults to True.
return_text (bool) – whether to return formatted string. Defaults to False.
- Returns:
formatted text if return_text=True, else None.
- Return type:
None | str
Examples
>>> pretty_print("Hello World", max_char_per_line=30) ****************************** * Hello World * ******************************
Definition of the BcdiPipeline class.
- Authors:
Clément Atlan, clement.atlan@esrf.fr - 09/2024
- exception cdiutils.pipeline.bcdi.PyNXScriptError(msg=None)[source]#
Bases:
ExceptionCustom exception to handle pynx script failure.
- __init__(msg=None)[source]#
Initialise PyNXScriptError with an informative message.
The msg argument may be a string, an exception, or a file-like object (for example an open stderr pipe). This constructor will coerce non-string inputs into a readable string representation and avoid TypeError when concatenating.
- Parameters:
msg (object, optional) – additional error message. Can be a str, Exception, file-like object or None.
- class cdiutils.pipeline.bcdi.BcdiPipeline(params=None, param_file_path=None)[source]#
Bases:
PipelineA class to handle the BCDI workflow, from pre-processing to post-processing, including phase retrieval (using PyNX package). Provide either a path to a parameter file or directly the parameter dictionary.
- Parameters:
param_file_path (str, optional) – the path to the parameter file. Defaults to None.
parameters (dict, optional) – the parameter dictionary. Defaults to None.
- voxel_pos = ('ref', 'max', 'com')#
- class_isosurface = 0.1#
- __init__(params=None, param_file_path=None)[source]#
Initialisation method.
- Parameters:
param_file_path (str, optional) – the path to the parameter file. Defaults to None.
parameters (dict, optional) – the parameter dictionary. Defaults to None.
- detector_data: ndarray#
- cropped_detector_data: ndarray#
- orthogonalised_intensity: ndarray#
- mask: ndarray#
- angles: dict#
- converter: SpaceConverter#
- result_analyser: PhasingResultAnalyser#
- reconstruction: ndarray#
- structural_props: dict#
- extra_info: dict#
- update_from_file(path)[source]#
Update pipeline instance with parameters from CXI file.
Loads parameters and SpaceConverter from a CXI file and updates the current instance. Useful for resuming analysis from saved reconstruction results.
- Parameters:
path (str) – path to CXI file.
- Raises:
ValueError – if file format is unsupported or q_lab_ref is missing in parameters.
- classmethod load_from_cxi(path)[source]#
Load pipeline parameters and SpaceConverter from CXI file.
Extracts stored parameters and reconstruction metadata from a CXI file, rebuilding the SpaceConverter for coordinate transformations.
- Parameters:
path (str) – path to CXI file.
- Returns:
- extracted parameters and
configured SpaceConverter instance.
- Return type:
tuple[dict, SpaceConverter]
- Raises:
ValueError – if path does not end with ‘.cxi’.
- preprocess(**params)[source]#
Preprocess BCDI detector data for phase retrieval.
Handles complete preprocessing workflow: data loading, Bragg peak centring, cropping, filtering (hot pixels, flat-field, background subtraction), and Q-space coordinate system initialisation.
- Parameters:
**params – optional parameters to override instance params. Common overrides include ‘preprocess_shape’, ‘hot_ pixel_filter’, ‘flat_field’, ‘background_level’.
- Side effects:
Updates instance attributes: detector_data, cropped_ detector_data, mask, voi (voxels of interest), converter, q_lab_pos, atomic_params.
initialisation.
- Raises:
ValueError – if the requested shape and the voxel reference are not compatible.
- phase_retrieval_gui()[source]#
Launch the interactive phase retrieval GUI.
This method lazily imports and launches the PhaseRetrievalGUI from cdiutils.interactive.phase_retrieval. The lazy import avoids requiring the optional GUI-related dependencies (pynx and ipywidgets) unless this method is invoked.
The GUI is initialized with the pipeline instance and the pipeline’s pynx_phasing_dir as the working directory, and it searches for CXI files matching the pattern “Run.cxi”. After initialization, the GUI’s show() method is called to display the interface.
- Raises:
ImportError – If the PhaseRetrievalGUI cannot be imported because the required dependencies (‘pynx’ and ‘ipywidgets’) are not installed. The raised error suggests installing them via: pip install pynx ipywidgets
- Returns:
None
- Return type:
None
Example
pipeline.phase_retrieval_gui()
- phase_retrieval(jump_to_cluster=False, pynx_slurm_file_template=None, clear_former_results=False, cmd=None, search_pattern='*Run*.cxi', **pynx_params)[source]#
Execute phase retrieval using PyNX.
Runs PyNX either locally (direct subprocess) or on a SLURM cluster. Generates PyNX input file from parameters and manages job submission/monitoring if cluster execution is requested.
- Parameters:
jump_to_cluster (bool, optional) – submit job to SLURM cluster. Defaults to False (local execution).
pynx_slurm_file_template (str, optional) – path to SLURM script template. Defaults to None (uses built-in template).
clear_former_results (bool, optional) – delete previous reconstruction CXI files. Defaults to False.
cmd (str, optional) – command for local PyNX execution. Defaults to None (uses “pynx-cdi-id01 pynx-cdi- inputs.txt”).
search_pattern (str, optional) – glob pattern for finding result CXI files. Defaults to “Run.cxi”.
**pynx_params – PyNX parameters (e.g., nb_run, nb_raar, support_threshold). Override defaults.
- Raises:
PyNXScriptError – if PyNX execution fails.
subprocess.CalledProcessError – if subprocess commandlate (str, optional): the template for the pynx slurm file. Defaults to None.
clear_former_results (bool, optional) – whether ti clear the former results. Defaults to False.
cmd (str, optional) – the command to run when running pynx on the current machine. Defaults to None.
**pynx_params – additional pynx parameters.
PyNXScriptError – if PyNX execution fails.
subprocess.CalledProcessError – if subprocess command fails
- analyse_phasing_results(sorting_criterion='mean_to_max', search_pattern='*Run*.cxi', plot=True, plot_phasing_results=True, plot_phase=False, init_analyser=True)[source]#
Analyse and sort phase retrieval results by quality metrics.
Wrapper for PhasingResultAnalyser that evaluates reconstruction quality using various criteria. Sorts results and generates comparison plots.
- Parameters:
sorting_criterion (str, optional) –
quality metric for sorting. Options: - ‘mean_to_max’: amplitude homogeneity (Gaussian mean
vs. max)
’sharpness’: sum of amplitude^4 within support
’std’: amplitude standard deviation
’llk’: log-likelihood
’llkf’: free log-likelihood
Defaults to “mean_to_max”.
search_pattern (str, optional) – glob pattern for CXI files. Defaults to “Run.cxi”.
plot (bool, optional) – enable/disable all plots. Defaults to True.
plot_phasing_results (bool, optional) – plot result comparisons. Defaults to True.
plot_phase (bool, optional) – plot phase (with amplitude as opacity) instead of amplitude. Defaults to False.
init_analyser (bool, optional) – force reinitialisation of PhasingResultAnalyser. Defaults to True.
- Raises:
ValueError – if sorting_criterion is unknown.
- generate_support_from(run='best', output_path=None, fill=False, verbose=True, search_pattern='*Run*.cxi')[source]#
Extract and save support from a specific reconstruction run.
Generates a support mask from a phase retrieval result and saves it as a CXI file. Can be used directly in subsequent phasing by setting: support = <output_path> in PyNX params.
- Parameters:
run (int | str, optional) – run selection. Use “best” for top-ranked result or integer for specific run number. Defaults to “best”.
output_path (str, optional) – save path for support CXI file. If None, saves to pynx_phasing_dir/support.cxi. Defaults to None.
fill (bool, optional) – fill holes in support using morphological operations. Defaults to False.
verbose (bool, optional) – print info and plot support. Defaults to True.
search_pattern (str, optional) – glob pattern for CXI files. Defaults to “Run.cxi”.
- select_best_candidates(nb_of_best_sorted_runs=None, best_runs=None, search_pattern='*Run*.cxi')[source]#
Select best phase retrieval candidates for mode decomposition.
Wrapper for PhasingResultAnalyser.select_best_candidates. Choose candidates either by count (top N sorted) or explicit run numbers.
- Parameters:
nb_of_best_sorted_runs (int, optional) – number of top-sorted runs to select. Requires prior call to analyse_phasing_ results(). Defaults to None.
best_runs (list[int], optional) – explicit list of run numbers (e.g., [2, 5, 7]). Defaults to None.
search_pattern (str, optional) – glob pattern for CXI files. Defaults to “Run.cxi”.
- Raises:
ValueError – if result_analyser not initialised (call analyse_phasing_results() first)
ValueError – If the results have not been analysed yet.
- mode_decomposition(cmd=None, search_pattern='*Run*.cxi')[source]#
Perform mode decomposition on selected reconstruction candidates.
Extracts principal modes from multiple phase retrieval results using PyNX’s pynx-cdi-analysis (similar to PCA). Modes represent consistent features across reconstructions.
- Parameters:
cmd (str, optional) – command for mode decomposition if PyNX unavailable locally. Defaults to None (uses “pynx-cdi- analysis candidate_*.cxi –modes 1 –modes_output mode.h5”).
search_pattern (str, optional) – glob pattern for candidate CXI files. Defaults to “Run.cxi”.
- Side effects:
Saves modes to S{scan}_pynx_reconstruction_mode.cxi in dump_dir.
- postprocess(**params)[source]#
Postprocess phase retrieval results to extract physical properties.
Comprehensive workflow: loads reconstruction, orthogonalises to lab frame, optionally flips/apodizes, estimates support isosurface, and computes structural properties (phase, displacement, strain, d-spacing, lattice parameter).
- Parameters:
**params – optional parameters to override instance params. Common overrides: - ‘voxel_size’: target voxel size (nm) - ‘isosurface’: support threshold (0-1) - ‘apodize’: window function (‘blackman’, ‘hann’, etc.) - ‘flip’: flip reconstruction (complex conjugate) - ‘convention’: ‘xu’ or ‘cxi’ - ‘handle_defects’: enable defect-aware processing
- Side effects:
Updates instance attributes: reconstruction, structural_ props, extra_info. Generates amplitude distribution plot.
- Raises:
ValueError – if unrecognised parameter provided.
Visualisation Support#
- class cdiutils.pipeline.pipeline_plotter.PipelinePlotter[source]#
Bases:
objectPlotting utilities for BCDI pipeline results visualisation.
Provides class methods for detector data, orthogonalised data, summary plots, FFT visualisation, and strain statistics. All methods return matplotlib figure and axes for customisation.
- classmethod detector_data(det_data, voxels=None, full_det_data=None, integrate=False, title='', save=None)[source]#
Plot detector data slices with voxel position markers.
Displays three orthogonal slices through detector volume. If full_det_data provided, shows both cropped and raw data for comparison. Marks reference, maximum, and centre-of-mass positions when provided in voxels dict.
- Parameters:
det_data (ndarray) – cropped 3D detector data array.
voxels (dict) – dictionary with ‘cropped’/’full’ keys containing ‘ref’, ‘max’, ‘com’ positions. Optional.
full_det_data (ndarray) – uncropped detector data for comparison. Optional.
integrate (bool) – if True, integrate perpendicular slices. Default False.
title (str) – plot title. Default empty string.
save (str) – filepath to save figure. Optional.
- Returns:
matplotlib Figure and Axes objects.
- Return type:
(fig, axes)
- static ortho_detector_data(det_data, ortho_data, q_grid, title='', save=None)[source]#
Compare raw and orthogonalised detector data side by side.
Displays detector frame (top row) and reciprocal lab frame (bottom row) with three orthogonal slice views each.
- Parameters:
det_data (ndarray) – raw 3D detector data.
ortho_data (ndarray) – orthogonalised 3D reciprocal space data.
q_grid (ndarray) – tuple of 3 Q-space coordinate arrays.
title (str) – plot title. Default empty string.
save (str) – filepath to save figure. Optional.
- Returns:
matplotlib Figure and 2x3 Axes grid.
- Return type:
(fig, axes)
- static summary_plot(title=None, support=None, table_info=None, voxel_size=None, save=None, unique_vmin=None, unique_vmax=None, cmap=None, figsize=(6, 4), convention='cxi', **to_plot)[source]#
Create multi-panel summary plot of reconstruction results.
Displays three orthogonal slices for each quantity (amplitude, displacement, strain, etc.). Applies support mask to strain fields. Includes optional table with analysis metrics.
- Parameters:
title (str) – figure title. Optional.
support (ndarray) – 3D binary support mask. Optional.
table_info (dict) – dict of scalar metrics for table display. Optional.
voxel_size (tuple) – (z,y,x) voxel sizes in nm. Optional.
save (str) – filepath to save figure. Optional.
unique_vmin (float) – override colorbar minimum. Optional.
unique_vmax (float) – override colorbar maximum. Optional.
cmap (str) – colormap name. Optional.
figsize (tuple) – (width, height) in inches. Default (6,4).
convention (str) – ‘cxi’ or ‘xu’ coordinate system. Default ‘cxi’.
**to_plot – keyword arguments with array name and 3D data.
- Returns:
matplotlib Figure and 3xN Axes grid.
- Return type:
(fig, axes)
- static plot_final_object_fft(obj, voxel_size, q_space_shift, exp_ortho_data, exp_data_q_grid, title=None, save=None)[source]#
Compare FFT of final object with experimental orthogonalised data.
Pads object to match experimental data shape before FFT computation. Displays both in reciprocal lab frame.
- Parameters:
obj (ndarray) – final reconstructed object (direct space).
voxel_size (tuple) – (z,y,x) voxel sizes in nm.
q_space_shift (tuple) – Q-space origin shift (3-tuple).
exp_ortho_data (ndarray) – experimental orthogonalised detector data.
exp_data_q_grid (ndarray) – experimental Q-space coordinate arrays.
title (str) – figure title. Optional.
save (str) – filepath to save figure. Optional.
- Returns:
matplotlib Figure and 2x3 Axes grid.
- Return type:
(fig, axes)
- static strain_statistics(strain, support, bins=50, colors=None, title='', save=None)[source]#
Plot a strain statistics graph displaying distribution of strain for the overall object, the bulk or the surface of the object.
- Parameters:
strain (np.ndarray) – the strain data.
support (np.ndarray) – the associated support.
bins (np.ndarray | int, optional) – the bins as accepted in numpy.histogram function. Defaults to 50.
colors (dict, optional) – the dictionary of colours. Defaults to None.
title (str, optional) – the title of the figure.
save (str) – (str, optional): the path where to save the figure.
- Returns:
the figure and axes.
- Return type:
tuple[plt.Figure, plt.Axes]
Parameter Management#
Parameter validation and management for BCDI pipeline.
This module provides utilities for managing pipeline parameters, including validation, default filling, and type conversion. It defines the DEFAULT_PIPELINE_PARAMS structure that serves as the schema for all pipeline configurations.
- cdiutils.pipeline.parameters.validate_and_fill_params(user_params, defaults={'alien_mask': None, 'apodize': 'blackman', 'background_level': None, 'beamline_setup': 'REQUIRED', 'convention': 'cxi', 'debug': True, 'det_calib_params': None, 'det_reference_voxel': None, 'detector_data_path': None, 'detector_name': None, 'dump_dir': 'REQUIRED', 'edf_file_template': None, 'energy': None, 'experiment_data_dir_path': None, 'experiment_file_path': None, 'facets': {'amplitude_threshold': None, 'authorised_index': 1, 'derivative_threshold': None, 'display_f_e_c': 'facet', 'index_to_display': None, 'nb_facets': None, 'nb_nghbs_min': 0, 'order_of_derivative': None, 'remove_edges': True, 'size': 10, 'top_facet_reference_index': [1, 1, 1]}, 'flat_field': None, 'flip': False, 'handle_defects': False, 'hkl': [1, 1, 1], 'hot_pixel_filter': False, 'isosurface': None, 'light_loading': False, 'orthogonalise_before_phasing': False, 'preprocess_shape': (150, 150), 'pynx': {'algorithm': None, 'auto_center_resize': False, 'beta': 0.9, 'crop_output': 0, 'data': None, 'data2cxi': False, 'detwin': True, 'live_plot': False, 'mask': None, 'mpi': 'run', 'nb_er': 200, 'nb_hio': 300, 'nb_ml': 0, 'nb_raar': 500, 'nb_run': 20, 'nb_run_keep': 10, 'output_format': 'cxi', 'positivity': False, 'psf': 'pseudo-voigt,1,0.05,20', 'rebin': '1, 1, 1', 'roi': 'full', 'save_plot': True, 'support': 'auto', 'support_only_shrink': False, 'support_post_expand': None, 'support_size': None, 'support_smooth_width_begin': 2, 'support_smooth_width_end': 0.5, 'support_threshold': '0.15, 0.40', 'support_threshold_method': 'rms', 'support_update_border_n': 0, 'support_update_period': 20, 'verbose': 100, 'zero_mask': False}, 'q_lab_ref': None, 'rocking_angle_binning': None, 'sample_name': None, 'sample_orientation': None, 'sample_surface_normal': None, 'scan': 'REQUIRED', 'show': False, 'support': {'raw_process': True, 'support_method': None, 'support_path': None}, 'verbose': True, 'voxel_reference_methods': ['max', 'com', 'com'], 'voxel_size': None})[source]#
Validate user parameters and fill missing values with defaults.
Recursively validates a user-provided parameter dictionary against a schema of defaults. Ensures all required parameters (marked as ‘REQUIRED’) are present, fills in missing optional parameters with their default values, and warns about unknown parameters.
This function handles nested dictionaries (e.g., ‘pynx’, ‘facets’) by recursing into them and validating each level independently.
- Parameters:
user_params (dict[str, Any]) – dictionary of user-provided pipeline parameters. Can be nested (e.g., {‘pynx’: {‘nb_run’: 10}}).
defaults (dict[str, Any]) – schema dictionary defining allowed parameters and their default values. Parameters with value ‘REQUIRED’ must be provided by the user. Defaults to DEFAULT_PIPELINE_PARAMS.
- Returns:
A new dictionary containing all parameters from the schema, with user values where provided and defaults elsewhere.
- Raises:
ValueError – if a required parameter (value=’REQUIRED’ in defaults) is missing from user_params.
- Return type:
dict[str, Any]
Examples
>>> user = {'scan': 42, 'dump_dir': '/tmp', ... 'beamline_setup': 'ID01'} >>> params = validate_and_fill_params(user) >>> params['scan'] 42 >>> params['energy'] # filled with default None >>> params['pynx']['nb_run'] # nested default 20
>>> # missing required parameter raises error >>> validate_and_fill_params({'scan': 42}) ValueError: Missing required parameter: 'beamline_setup'
- cdiutils.pipeline.parameters.collect_keys(d)[source]#
Recursively collect all keys from a nested dictionary.
Traverses a potentially nested dictionary structure and extracts all keys at all levels, returning them as a flat set. This is useful for building a complete list of valid parameter names from the hierarchical DEFAULT_PIPELINE_PARAMS structure.
- Parameters:
d (dict[str, Any]) – dictionary to extract keys from. Can contain nested dicts.
- Returns:
Set containing all keys found at all nesting levels.
- Return type:
set[str]
Examples
>>> params = {'a': 1, 'b': {'c': 2, 'd': {'e': 3}}} >>> collect_keys(params) {'a', 'b', 'c', 'd', 'e'}
- cdiutils.pipeline.parameters.isparameter(string)[source]#
Check if a string is a valid pipeline parameter name.
Determines whether the given string corresponds to any key in the DEFAULT_PIPELINE_PARAMS schema, at any nesting level. Uses lazy caching to avoid recomputing the full key set on every call.
The valid keys are computed once and stored in a global cache for subsequent calls, making this function efficient for repeated validation checks.
- Parameters:
string (str) – candidate parameter name to check.
- Returns:
True if string is a valid parameter name in the schema, False otherwise.
- Return type:
bool
- cdiutils.pipeline.parameters.get_params_from_variables(dir_list, globals_dict)[source]#
Extract pipeline parameters from global variables.
Filters global variables by matching names against DEFAULT_PIPELINE_PARAMS keys. Organises parameters into top-level and nested sub-dicts (‘pynx’, ‘facets’, ‘support’).
- Parameters:
dir_list (list) – list of variable names (e.g., from dir()).
globals_dict (dict) – global namespace dict (e.g., globals()).
- Returns:
filtered parameter dictionary with nested structure.
- Return type:
dict
Example
>>> scan = 42 >>> nb_raar = 500 >>> params = get_params_from_variables( ... dir(), globals() ... ) numpy types and arrays to Python types for
YAML serialisation.
- cdiutils.pipeline.parameters.convert_np_arrays(**data)[source]#
Handles numpy arrays, scalars (int, float, bool, str), nested structures (lists, tuples, dicts), and converts them to YAML- compatible types
- Parameters:
**data (Any) – arbitrary keyword arguments representing a dictionary with potential numpy types.
- Returns:
- A dictionary with all numpy types converted to standard
Python types.
- Return type:
dict