API Reference¶
Library entry points for programmatic use of tokamunch.
Context¶
- class tokamunch.MappingContext(mapper: Any, tokamap: TokamapInterface, device: str, shot: int | None, cli_config: CLIConfig | None = None, concurrency: ConcurrencyConfig = <factory>)¶
Holds all runtime state needed to execute a mapping run.
Can be constructed directly for programmatic (library) use, or via
from_configto load settings from a TOML config file for CLI use.cli_configis required for process-based concurrency: each worker process receives the fullCLIConfigand reconstructs its own mapper without needing any config file to be present on disk.- concurrency: ConcurrencyConfig¶
- device: str¶
- classmethod from_config(config: str, *, device: str | None = None, shot: int | None = None) MappingContext¶
Build a MappingContext from a munchi TOML config file.
deviceandshotoverride the values in the config when provided.
- mapper: Any¶
- shot: int | None¶
- tokamap: TokamapInterface¶
Selection¶
- class tokamunch.IdsSelection(ids: str, match: str | None = None, leaves_only: bool = False, mapping_keys: frozenset[str] | None = None)¶
Expand all concrete paths for an IDS, with optional filtering.
- class tokamunch.SinglePathSelection(path: str, mapping_keys: frozenset[str] | None = None)¶
Map a single known concrete path.
- tokamunch.selection.generate_selected_paths(selection: IdsSelection | SinglePathSelection | MultiPathSelection, ctx: MappingContext) Iterator[str]¶
IDS schema helpers¶
- class tokamunch.IDSHelper(ids_paths: Iterable[str])¶
- reset_expansion_cache() None¶
Discard all cached array-length results so the next expansion re-queries.
- tokamunch.generate_ids_paths(ids_name: str) Iterator[str]¶
Mapper interface¶
- class tokamunch.TokamapInterface(mapper: MapperProtocol, device: str, *, shot: int | None = None, extra_args: dict[str, Any] | None = None)¶
- class tokamunch.MapperProtocol(*args, **kwargs)¶
Protocol for the underlying mapper object (e.g.
libtokamap.Mapper).Typing
TokamapInterface.mapperagainst this protocol allows tests and library users to supply a lightweight fake without installing libtokamap.
- class tokamunch.DataSource(*args, **kwargs)¶
Protocol for data source objects registered with the mapper.
The return type of map() is intentionally untyped (Any) to avoid introducing a numpy dependency solely for the type hint — callers should expect numpy arrays, scalars, or None.
- class tokamunch.DataSourceFactory(*args, **kwargs)¶
Protocol for entry-point data-source factories.
A factory receives plugin-specific configuration args and returns an object suitable for mapper.register_python_data_source(…).
Configuration¶
- class tokamunch.CLIConfig(mapper: 'MapperConfig', run: 'RunConfig', data_sources: 'list[DataSourceConfig]')¶
- class tokamunch.MapperConfig(device: 'str', config: 'str | None' = None, config_params: 'dict[str, Any] | None' = None)¶
- class tokamunch.RunConfig(default_shot: 'int | None' = None, concurrency: 'ConcurrencyConfig' = <factory>, log_level: 'str' = 'WARNING', binary_arrays: 'bool' = False, on_imas_error: 'str' = 'fallback-json')¶
- class tokamunch.ConcurrencyConfig(mode: 'ConcurrencyMode' = <ConcurrencyMode.SERIAL: 'serial'>, workers: 'int' = 1)¶
- class tokamunch.DataSourceConfig(mapper_name: 'str', plugin: 'str', enabled: 'bool' = True, args: 'dict[str, Any]'=<factory>)¶
- tokamunch.config.apply_config_overrides(cfg: CLIConfig, overrides: list[str]) CLIConfig¶
Apply
dotted.key=valueoverrides to a CLIConfig, returning a new instance.- Supported keys:
run.concurrency.mode → ConcurrencyMode run.concurrency.workers → int run.log_level → str (must be a valid log level) run.binary_arrays → bool (“true”/”false”/”1”/”0”) run.on_imas_error → str (“fallback-json” or “raise”) run.default_shot → int mapper.device → str
Since all dataclasses use
slots=True, new instances are constructed rather than mutating the originals.- Raises:
ValueError – For unknown keys or invalid values.
Path utilities¶
- tokamunch.concrete_path_to_schema_path(path: str) str¶
Convert a concrete runtime path to its schema form.
Replaces every concrete array index
[N]with the schema array-struct marker(:).Example:
"magnetics/flux_loop[0]/position[2]/r" -> "magnetics/flux_loop(:)/position(:)/r"
- tokamunch.concrete_path_to_template(path: str) str¶
Convert a concrete runtime path to its mapping-template form.
Replaces every concrete array index
[N]with the template placeholder[#], enabling lookup against mapping-file keys.Example:
"magnetics/flux_loop[0]/position[2]/r" -> "magnetics/flux_loop[#]/position[#]/r"
- tokamunch.parse_concrete_path(path: str) Iterator[IDSNode]¶
- tokamunch.parse_schema_path(path: str, interner: SegmentInterner | None = None) Iterator[IDSNode]¶
- tokamunch.render_concrete_path(nodes: Iterable[IDSNode]) str¶
- tokamunch.render_schema_path(nodes: Iterable[IDSNode]) str¶
- tokamunch.render_array_length_query_path(nodes: Iterable[IDSNode]) str¶
- tokamunch.normalise_schema_segment(raw: str) str¶
Mapping execution¶
- tokamunch.mapping.collect_mapped_values(ctx: MappingContext, selection: Selection, *, verbose_errors: bool, progress_callback: _ProgressCallback | None = None, on_paths_ready: Callable[[int], None] | None = None, profile: ProfileData | None = None, dry_run: bool = False, limit: int | None = None) tuple[list[MappingRecord], MappingSummary]¶
- tokamunch.mapping.map_path(ctx: MappingContext, ids_path: str) Any¶
- class tokamunch.mapping.MappingRecord(ids_path: 'str', value: 'Any | None' = None, error: 'Exception | None' = None, suppressed: 'bool' = False)¶
- class tokamunch.mapping.MappingSummary(total_paths: 'int' = 0, mapped: 'int' = 0, returned_none: 'int' = 0, suppressed_errors: 'int' = 0, unexpected_errors: 'int' = 0, elapsed_s: 'float' = 0.0)¶
IDS writers¶
- tokamunch.set_ids_value(ids_obj: Any, segments: Iterable[IDSNode], value: Any, *, skip_root_segment: bool = True) None¶
- tokamunch.resize_and_set_ids_value(ids_obj: Any, segments: Iterable[IDSNode], value: Any, array_sizes: dict[str, int], *, write_context: WriteContext | None = None, skip_root_segment: bool = True) None¶
- tokamunch.ensure_ids_arrays_resized(ids_obj: Any, segments: Iterable[IDSNode], array_sizes: dict[str, int], *, write_context: WriteContext | None = None, skip_root_segment: bool = True) None¶
- tokamunch.resolve_ids_parent(ids_obj: Any, segments: Iterable[IDSNode], *, skip_root_segment: bool = True) tuple[Any, IDSNode]¶
- tokamunch.resolve_ids_segments(ids_obj: Any, segments: Iterable[IDSNode], *, skip_root_segment: bool = True) Any¶
- class tokamunch.write_ids.IdsWriteError(ids_name: str, records: list[MappingRecord], cause: Exception)¶
Captures a per-IDS write failure together with the records that couldn’t be written.
Format conversion¶
Functions for moving data between munchi JSON output, imas-python IMAS files
(.h5, .nc), and in-memory imas IDS objects.
- tokamunch.read_json_records(path: Path) list[MappingRecord]¶
Read a munchi JSON output file and return the entries as MappingRecord list.
The JSON must be a flat
{concrete_path: value}object as produced bymunchi map --output results.json. Binary-encoded ndarrays ({"__ndarray__": ..., "dtype": ..., "shape": ...}) are decoded back to numpy arrays automatically.
- tokamunch.records_to_ids_objects(records: list[MappingRecord]) dict[str, Any]¶
Convert a list of MappingRecords into a dict of imas IDS objects.
Returns a
{ids_name: ids_obj}mapping. Each IDS object is populated with the supplied records viasetattrtraversal, with array-struct nodes resized as needed.This is the primary building block for the JSON → IMAS workflow:
records = read_json_records(Path("results.json")) ids_objects = records_to_ids_objects(records) # Augment the IDS with any missing required fields, then write: ids_objects["equilibrium"].time_slice[0].time = 0.5 with imas.DBEntry("imas:hdf5?path=output", "w") as db: for ids_obj in ids_objects.values(): db.put(ids_obj)
- tokamunch.read_ids_records(ids_obj: Any, ids_name: str) list[MappingRecord]¶
Extract all non-empty leaf values from a loaded imas IDS object.
Uses the IMAS data-dictionary schema to enumerate candidate paths and reads each field value directly from ids_obj — no mapper required. Empty/unset fields (zero-length arrays,
None) are silently skipped.- Parameters:
ids_obj – An imas IDS object as returned by
imas.IDSFactory().<ids_name>()orimas.DBEntry.get(ids_name).ids_name – The IDS name (e.g.
"magnetics"), used to look up the schema.
- tokamunch.read_imas_records(path: Path, ids_names: list[str]) list[MappingRecord]¶
Read IDS records from an imas HDF5 or NetCDF file.
- Parameters:
path – Path to an
.h5or.ncfile.ids_names – Names of the IDS objects to read (e.g.
["magnetics", "equilibrium"]). The file format does not expose its contents without reading each IDS explicitly, so the names must be supplied by the caller.
- tokamunch.convert_file(input_path: Path, output_path: Path, *, ids_names: list[str] | None = None, force: bool = False, binary_arrays: bool = False, on_imas_error: str = 'fallback-json') list[IdsWriteError]¶
Convert data between supported file formats.
- Supported input formats:
.json— munchi JSON output.h5/.nc— imas-python IMAS files (requires ids_names)
- Supported output formats:
.json— munchi JSON output.h5/.nc— imas-python IMAS files
- Parameters:
input_path – Source file.
output_path – Destination file. If
Nonethe records are returned but not written.ids_names – IDS names to read when the input is an IMAS file. Ignored for JSON.
force – Overwrite output_path if it already exists.
binary_arrays – Encode numpy arrays as base64 binary objects in JSON output.
on_imas_error –
"fallback-json"or"raise". Controls what happens when an individual IDS fails to write. Seewrite_imas_output().
- Returns:
Per-IDS write errors (empty on full success). Only relevant when the output format is IMAS.
- Return type:
list[IdsWriteError]
Diff and comparison¶
- tokamunch.diff.diff_records(records_a: list[MappingRecord], records_b: list[MappingRecord]) list[DiffEntry]¶
Compare two sets of mapping records.
- Parameters:
records_a – Lists of
MappingRecordobjects from two separate mapping runs or files.records_b – Lists of
MappingRecordobjects from two separate mapping runs or files.
- Returns:
One entry per unique path across both sets. The order is: paths present in A (in their original order), then paths only in B (alphabetically).
"unchanged"entries are included so that callers can compute statistics.- Return type:
list[DiffEntry]
- tokamunch.diff.diff_files(path_a: Path, path_b: Path, *, ids_names: list[str] | None = None) list[DiffEntry]¶
Load two files and return diff entries.
Supports
.json(munchi JSON output) and IMAS files (.h5,.nc). For IMAS files, ids_names must be supplied.- Parameters:
path_a – Input files to compare.
path_b – Input files to compare.
ids_names – IDS names to read from IMAS files. Ignored for JSON input.
- tokamunch.diff.render_diff(entries: list[DiffEntry], label_a: str, label_b: str, *, show_unchanged: bool = False) str¶
Render a human-readable diff.
Lines are prefixed with: *
+for added paths (in B but not A) *-for removed paths (in A but not B) *~for changed values * `` `` (space) for unchanged (only shown when show_unchanged is True)
- class tokamunch.diff.DiffEntry(path: str, value_a: Any, value_b: Any, status: str)¶
A single entry in a mapping diff.
- status: str¶
One of
"added","removed","changed","unchanged".
- value_a: Any¶
Value from the first file, or
Noneif the path was absent.
- value_b: Any¶
Value from the second file, or
Noneif the path was absent.
Checkpointing¶
- class tokamunch.checkpoint.Checkpoint(output_path: str, completed_paths: list[str] = <factory>, results: dict[str, ~typing.Any]=<factory>)¶
Persistent state for a partially-completed mapping run.
- completed_paths: list[str]¶
Concrete paths whose mapping has already been stored.
- output_path: str¶
The target output file this checkpoint corresponds to.
- results: dict[str, Any]¶
path → JSON-safe value for each completed path.
- tokamunch.checkpoint.save_checkpoint(path: Path, cp: Checkpoint) None¶
Write cp to path atomically (write to a .tmp file then rename).
The parent directory is created if it does not already exist.
- tokamunch.checkpoint.load_checkpoint(path: Path) Checkpoint | None¶
Load a checkpoint from path.
Returns
Noneif the file does not exist. RaisesValueErrorif the file exists but cannot be parsed as a valid checkpoint.
- tokamunch.checkpoint.apply_checkpoint(paths: list[str], cp: Checkpoint) tuple[list[str], list[MappingRecord]]¶
Split paths into remaining work and already-completed records.
- Parameters:
paths – Full list of concrete paths to be mapped.
cp – Previously saved checkpoint.
- Returns:
remaining_paths – Paths NOT already in
cp.completed_paths.already_done_records –
MappingRecordinstances for every path that was already completed, with values restored fromcp.results.
Shell completions¶
- tokamunch.completions.generate_bash_completion(ids_names: list[str]) str¶
Generate a bash completion script for munchi.
- tokamunch.completions.generate_zsh_completion(ids_names: list[str]) str¶
Generate a zsh completion script for munchi.
- tokamunch.completions.generate_fish_completion(ids_names: list[str]) str¶
Generate a fish shell completion script for munchi.
- tokamunch.completions.get_ids_names() list[str]¶
Return sorted list of known IDS names from imas_data_dictionary.
Returns an empty list if the data dictionary is not available.
Mapping annotations¶
- tokamunch.templates.is_comment_stub(value: Any) bool¶
Return True if value is a documented stub with no mapping expression.
A comment stub is a dict that has a
"comment"key and whose remaining keys are limited to the allowed metadata set ("units","source"). Any other key (e.g. an expression) means this entry carries real data and is therefore not a stub.
- tokamunch.templates.merge_mapping_stubs(ids_name: str, existing_path: Path, *, leaves_only: bool = False) dict[str, Any]¶
Merge an existing mapping file with blank stubs for any missing IDS paths.
All existing entries are preserved with their original values. Any template path present in the IDS schema but absent from the file is added as
{"comment": ""}.Existing entries appear first (in their original file order), followed by new stubs sorted alphabetically.
- Parameters:
ids_name – IDS name, e.g.
"magnetics".existing_path – Path to the existing mapping JSON file.
leaves_only – If True, only leaf schema paths are considered when adding stubs.
- Returns:
Merged mapping dict.
- Return type:
dict[str, Any]
- tokamunch.templates.build_blank_mapping_template(ids_name: str, *, leaves_only: bool) dict[str, dict[str, Any]]¶
Profiling¶
See also: Profiling and performance for usage examples.
- class tokamunch.profiling.ProfileData(phases: 'PhaseTimings' = <factory>, mapper_map: 'CallStats' = <factory>, array_length: 'CallStats' = <factory>)
- class tokamunch.profiling.PhaseTimings(expansion_s: 'float' = 0.0, mapping_s: 'float' = 0.0, output_s: 'float' = 0.0)
- class tokamunch.profiling.CallStats(count: int = 0, total_s: float = 0.0, min_s: float = inf, max_s: float = 0.0)
Thread-safe accumulator for per-call timing statistics.
- tokamunch.profiling.render_profile_report(data: ProfileData, total_elapsed_s: float) str
Trie¶
- tokamunch.build_ids_path_trie(ids_paths: Iterable[str]) TrieNode¶
- tokamunch.generate_schema_paths_from_trie(root: TrieNode, *, leaves_only: bool = False) Iterator[str]¶
- tokamunch.expand_ids_path_trie(root: TrieNode, get_length_callback: Callable[[str], int], *, context: ExpansionContext | None = None, leaves_only: bool = False) Iterator[str]¶
- tokamunch.expand_ids_path_trie_segments(root: TrieNode, get_length_callback: Callable[[str], int], *, context: ExpansionContext | None = None, leaves_only: bool = False) Iterator[tuple[IDSNode, ...]]¶