API Reference

Library entry points for programmatic use of tokamunch.

Context

class tokamunch.MappingContext(mapper: Any, tokamap: TokamapInterface, device: str, shot: int | None, cli_config: CLIConfig | None = None, concurrency: ConcurrencyConfig = <factory>)

Holds all runtime state needed to execute a mapping run.

Can be constructed directly for programmatic (library) use, or via from_config to load settings from a TOML config file for CLI use.

cli_config is required for process-based concurrency: each worker process receives the full CLIConfig and reconstructs its own mapper without needing any config file to be present on disk.

cli_config: CLIConfig | None = None
concurrency: ConcurrencyConfig
device: str
classmethod from_config(config: str, *, device: str | None = None, shot: int | None = None) MappingContext

Build a MappingContext from a munchi TOML config file.

device and shot override the values in the config when provided.

ids_helper(ids_name: str) IDSHelper
mapper: Any
shot: int | None
tokamap: TokamapInterface

Selection

class tokamunch.IdsSelection(ids: str, match: str | None = None, leaves_only: bool = False, mapping_keys: frozenset[str] | None = None)

Expand all concrete paths for an IDS, with optional filtering.

class tokamunch.SinglePathSelection(path: str, mapping_keys: frozenset[str] | None = None)

Map a single known concrete path.

tokamunch.selection.generate_selected_paths(selection: IdsSelection | SinglePathSelection | MultiPathSelection, ctx: MappingContext) Iterator[str]

IDS schema helpers

class tokamunch.IDSHelper(ids_paths: Iterable[str])
reset_expansion_cache() None

Discard all cached array-length results so the next expansion re-queries.

tokamunch.generate_ids_paths(ids_name: str) Iterator[str]

Mapper interface

class tokamunch.TokamapInterface(mapper: MapperProtocol, device: str, *, shot: int | None = None, extra_args: dict[str, Any] | None = None)
class tokamunch.MapperProtocol(*args, **kwargs)

Protocol for the underlying mapper object (e.g. libtokamap.Mapper).

Typing TokamapInterface.mapper against this protocol allows tests and library users to supply a lightweight fake without installing libtokamap.

class tokamunch.DataSource(*args, **kwargs)

Protocol for data source objects registered with the mapper.

The return type of map() is intentionally untyped (Any) to avoid introducing a numpy dependency solely for the type hint — callers should expect numpy arrays, scalars, or None.

class tokamunch.DataSourceFactory(*args, **kwargs)

Protocol for entry-point data-source factories.

A factory receives plugin-specific configuration args and returns an object suitable for mapper.register_python_data_source(…).

Configuration

class tokamunch.CLIConfig(mapper: 'MapperConfig', run: 'RunConfig', data_sources: 'list[DataSourceConfig]')
class tokamunch.MapperConfig(device: 'str', config: 'str | None' = None, config_params: 'dict[str, Any] | None' = None)
class tokamunch.RunConfig(default_shot: 'int | None' = None, concurrency: 'ConcurrencyConfig' = <factory>, log_level: 'str' = 'WARNING', binary_arrays: 'bool' = False, on_imas_error: 'str' = 'fallback-json')
class tokamunch.ConcurrencyConfig(mode: 'ConcurrencyMode' = <ConcurrencyMode.SERIAL: 'serial'>, workers: 'int' = 1)
class tokamunch.DataSourceConfig(mapper_name: 'str', plugin: 'str', enabled: 'bool' = True, args: 'dict[str, Any]'=<factory>)
tokamunch.load_cli_config(path: str | Path) CLIConfig
tokamunch.config.apply_config_overrides(cfg: CLIConfig, overrides: list[str]) CLIConfig

Apply dotted.key=value overrides to a CLIConfig, returning a new instance.

Supported keys:

run.concurrency.mode → ConcurrencyMode run.concurrency.workers → int run.log_level → str (must be a valid log level) run.binary_arrays → bool (“true”/”false”/”1”/”0”) run.on_imas_error → str (“fallback-json” or “raise”) run.default_shot → int mapper.device → str

Since all dataclasses use slots=True, new instances are constructed rather than mutating the originals.

Raises:

ValueError – For unknown keys or invalid values.

Path utilities

tokamunch.concrete_path_to_schema_path(path: str) str

Convert a concrete runtime path to its schema form.

Replaces every concrete array index [N] with the schema array-struct marker (:).

Example:

"magnetics/flux_loop[0]/position[2]/r"
-> "magnetics/flux_loop(:)/position(:)/r"
tokamunch.concrete_path_to_template(path: str) str

Convert a concrete runtime path to its mapping-template form.

Replaces every concrete array index [N] with the template placeholder [#], enabling lookup against mapping-file keys.

Example:

"magnetics/flux_loop[0]/position[2]/r"
-> "magnetics/flux_loop[#]/position[#]/r"
tokamunch.parse_concrete_path(path: str) Iterator[IDSNode]
tokamunch.parse_schema_path(path: str, interner: SegmentInterner | None = None) Iterator[IDSNode]
tokamunch.render_concrete_path(nodes: Iterable[IDSNode]) str
tokamunch.render_schema_path(nodes: Iterable[IDSNode]) str
tokamunch.render_array_length_query_path(nodes: Iterable[IDSNode]) str
tokamunch.normalise_schema_segment(raw: str) str

Mapping execution

tokamunch.mapping.collect_mapped_values(ctx: MappingContext, selection: Selection, *, verbose_errors: bool, progress_callback: _ProgressCallback | None = None, on_paths_ready: Callable[[int], None] | None = None, profile: ProfileData | None = None, dry_run: bool = False, limit: int | None = None) tuple[list[MappingRecord], MappingSummary]
tokamunch.mapping.map_path(ctx: MappingContext, ids_path: str) Any
class tokamunch.mapping.MappingRecord(ids_path: 'str', value: 'Any | None' = None, error: 'Exception | None' = None, suppressed: 'bool' = False)
class tokamunch.mapping.MappingSummary(total_paths: 'int' = 0, mapped: 'int' = 0, returned_none: 'int' = 0, suppressed_errors: 'int' = 0, unexpected_errors: 'int' = 0, elapsed_s: 'float' = 0.0)

IDS writers

tokamunch.set_ids_value(ids_obj: Any, segments: Iterable[IDSNode], value: Any, *, skip_root_segment: bool = True) None
tokamunch.resize_and_set_ids_value(ids_obj: Any, segments: Iterable[IDSNode], value: Any, array_sizes: dict[str, int], *, write_context: WriteContext | None = None, skip_root_segment: bool = True) None
tokamunch.ensure_ids_arrays_resized(ids_obj: Any, segments: Iterable[IDSNode], array_sizes: dict[str, int], *, write_context: WriteContext | None = None, skip_root_segment: bool = True) None
tokamunch.resolve_ids_parent(ids_obj: Any, segments: Iterable[IDSNode], *, skip_root_segment: bool = True) tuple[Any, IDSNode]
tokamunch.resolve_ids_segments(ids_obj: Any, segments: Iterable[IDSNode], *, skip_root_segment: bool = True) Any
class tokamunch.write_ids.IdsWriteError(ids_name: str, records: list[MappingRecord], cause: Exception)

Captures a per-IDS write failure together with the records that couldn’t be written.

Format conversion

Functions for moving data between munchi JSON output, imas-python IMAS files (.h5, .nc), and in-memory imas IDS objects.

tokamunch.read_json_records(path: Path) list[MappingRecord]

Read a munchi JSON output file and return the entries as MappingRecord list.

The JSON must be a flat {concrete_path: value} object as produced by munchi map --output results.json. Binary-encoded ndarrays ({"__ndarray__": ..., "dtype": ..., "shape": ...}) are decoded back to numpy arrays automatically.

tokamunch.records_to_ids_objects(records: list[MappingRecord]) dict[str, Any]

Convert a list of MappingRecords into a dict of imas IDS objects.

Returns a {ids_name: ids_obj} mapping. Each IDS object is populated with the supplied records via setattr traversal, with array-struct nodes resized as needed.

This is the primary building block for the JSON → IMAS workflow:

records = read_json_records(Path("results.json"))
ids_objects = records_to_ids_objects(records)

# Augment the IDS with any missing required fields, then write:
ids_objects["equilibrium"].time_slice[0].time = 0.5
with imas.DBEntry("imas:hdf5?path=output", "w") as db:
    for ids_obj in ids_objects.values():
        db.put(ids_obj)
tokamunch.read_ids_records(ids_obj: Any, ids_name: str) list[MappingRecord]

Extract all non-empty leaf values from a loaded imas IDS object.

Uses the IMAS data-dictionary schema to enumerate candidate paths and reads each field value directly from ids_obj — no mapper required. Empty/unset fields (zero-length arrays, None) are silently skipped.

Parameters:
  • ids_obj – An imas IDS object as returned by imas.IDSFactory().<ids_name>() or imas.DBEntry.get(ids_name).

  • ids_name – The IDS name (e.g. "magnetics"), used to look up the schema.

tokamunch.read_imas_records(path: Path, ids_names: list[str]) list[MappingRecord]

Read IDS records from an imas HDF5 or NetCDF file.

Parameters:
  • path – Path to an .h5 or .nc file.

  • ids_names – Names of the IDS objects to read (e.g. ["magnetics", "equilibrium"]). The file format does not expose its contents without reading each IDS explicitly, so the names must be supplied by the caller.

tokamunch.convert_file(input_path: Path, output_path: Path, *, ids_names: list[str] | None = None, force: bool = False, binary_arrays: bool = False, on_imas_error: str = 'fallback-json') list[IdsWriteError]

Convert data between supported file formats.

Supported input formats:
  • .json — munchi JSON output

  • .h5 / .nc — imas-python IMAS files (requires ids_names)

Supported output formats:
  • .json — munchi JSON output

  • .h5 / .nc — imas-python IMAS files

Parameters:
  • input_path – Source file.

  • output_path – Destination file. If None the records are returned but not written.

  • ids_names – IDS names to read when the input is an IMAS file. Ignored for JSON.

  • force – Overwrite output_path if it already exists.

  • binary_arrays – Encode numpy arrays as base64 binary objects in JSON output.

  • on_imas_error"fallback-json" or "raise". Controls what happens when an individual IDS fails to write. See write_imas_output().

Returns:

Per-IDS write errors (empty on full success). Only relevant when the output format is IMAS.

Return type:

list[IdsWriteError]

Diff and comparison

tokamunch.diff.diff_records(records_a: list[MappingRecord], records_b: list[MappingRecord]) list[DiffEntry]

Compare two sets of mapping records.

Parameters:
  • records_a – Lists of MappingRecord objects from two separate mapping runs or files.

  • records_b – Lists of MappingRecord objects from two separate mapping runs or files.

Returns:

One entry per unique path across both sets. The order is: paths present in A (in their original order), then paths only in B (alphabetically). "unchanged" entries are included so that callers can compute statistics.

Return type:

list[DiffEntry]

tokamunch.diff.diff_files(path_a: Path, path_b: Path, *, ids_names: list[str] | None = None) list[DiffEntry]

Load two files and return diff entries.

Supports .json (munchi JSON output) and IMAS files (.h5, .nc). For IMAS files, ids_names must be supplied.

Parameters:
  • path_a – Input files to compare.

  • path_b – Input files to compare.

  • ids_names – IDS names to read from IMAS files. Ignored for JSON input.

tokamunch.diff.render_diff(entries: list[DiffEntry], label_a: str, label_b: str, *, show_unchanged: bool = False) str

Render a human-readable diff.

Lines are prefixed with: * + for added paths (in B but not A) * - for removed paths (in A but not B) * ~ for changed values * `` `` (space) for unchanged (only shown when show_unchanged is True)

class tokamunch.diff.DiffEntry(path: str, value_a: Any, value_b: Any, status: str)

A single entry in a mapping diff.

status: str

One of "added", "removed", "changed", "unchanged".

value_a: Any

Value from the first file, or None if the path was absent.

value_b: Any

Value from the second file, or None if the path was absent.

Checkpointing

class tokamunch.checkpoint.Checkpoint(output_path: str, completed_paths: list[str] = <factory>, results: dict[str, ~typing.Any]=<factory>)

Persistent state for a partially-completed mapping run.

completed_paths: list[str]

Concrete paths whose mapping has already been stored.

output_path: str

The target output file this checkpoint corresponds to.

results: dict[str, Any]

path → JSON-safe value for each completed path.

tokamunch.checkpoint.save_checkpoint(path: Path, cp: Checkpoint) None

Write cp to path atomically (write to a .tmp file then rename).

The parent directory is created if it does not already exist.

tokamunch.checkpoint.load_checkpoint(path: Path) Checkpoint | None

Load a checkpoint from path.

Returns None if the file does not exist. Raises ValueError if the file exists but cannot be parsed as a valid checkpoint.

tokamunch.checkpoint.apply_checkpoint(paths: list[str], cp: Checkpoint) tuple[list[str], list[MappingRecord]]

Split paths into remaining work and already-completed records.

Parameters:
  • paths – Full list of concrete paths to be mapped.

  • cp – Previously saved checkpoint.

Returns:

  • remaining_paths – Paths NOT already in cp.completed_paths.

  • already_done_recordsMappingRecord instances for every path that was already completed, with values restored from cp.results.

Shell completions

tokamunch.completions.generate_bash_completion(ids_names: list[str]) str

Generate a bash completion script for munchi.

tokamunch.completions.generate_zsh_completion(ids_names: list[str]) str

Generate a zsh completion script for munchi.

tokamunch.completions.generate_fish_completion(ids_names: list[str]) str

Generate a fish shell completion script for munchi.

tokamunch.completions.get_ids_names() list[str]

Return sorted list of known IDS names from imas_data_dictionary.

Returns an empty list if the data dictionary is not available.

Mapping annotations

tokamunch.templates.is_comment_stub(value: Any) bool

Return True if value is a documented stub with no mapping expression.

A comment stub is a dict that has a "comment" key and whose remaining keys are limited to the allowed metadata set ("units", "source"). Any other key (e.g. an expression) means this entry carries real data and is therefore not a stub.

tokamunch.templates.merge_mapping_stubs(ids_name: str, existing_path: Path, *, leaves_only: bool = False) dict[str, Any]

Merge an existing mapping file with blank stubs for any missing IDS paths.

All existing entries are preserved with their original values. Any template path present in the IDS schema but absent from the file is added as {"comment": ""}.

Existing entries appear first (in their original file order), followed by new stubs sorted alphabetically.

Parameters:
  • ids_name – IDS name, e.g. "magnetics".

  • existing_path – Path to the existing mapping JSON file.

  • leaves_only – If True, only leaf schema paths are considered when adding stubs.

Returns:

Merged mapping dict.

Return type:

dict[str, Any]

tokamunch.templates.build_blank_mapping_template(ids_name: str, *, leaves_only: bool) dict[str, dict[str, Any]]

Profiling

See also: Profiling and performance for usage examples.

class tokamunch.profiling.ProfileData(phases: 'PhaseTimings' = <factory>, mapper_map: 'CallStats' = <factory>, array_length: 'CallStats' = <factory>)
class tokamunch.profiling.PhaseTimings(expansion_s: 'float' = 0.0, mapping_s: 'float' = 0.0, output_s: 'float' = 0.0)
class tokamunch.profiling.CallStats(count: int = 0, total_s: float = 0.0, min_s: float = inf, max_s: float = 0.0)

Thread-safe accumulator for per-call timing statistics.

tokamunch.profiling.render_profile_report(data: ProfileData, total_elapsed_s: float) str

Trie

tokamunch.build_ids_path_trie(ids_paths: Iterable[str]) TrieNode
tokamunch.generate_schema_paths_from_trie(root: TrieNode, *, leaves_only: bool = False) Iterator[str]
tokamunch.expand_ids_path_trie(root: TrieNode, get_length_callback: Callable[[str], int], *, context: ExpansionContext | None = None, leaves_only: bool = False) Iterator[str]
tokamunch.expand_ids_path_trie_segments(root: TrieNode, get_length_callback: Callable[[str], int], *, context: ExpansionContext | None = None, leaves_only: bool = False) Iterator[tuple[IDSNode, ...]]