Skip to content

aimbat.core

Core logic for AIMBAT.

All functions take a SQLModel Session and work with the models in aimbat.models. The main areas covered are:

  • Data — add data to the project, linking each source to its station, event, and seismogram records (add_data_to_project).
  • Events, seismograms, stations — query, update, and delete records; read and write parameters; resolve an event from an explicit ID (resolve_event).
  • ICCS / MCCC — run the Iterative Cross-Correlation and Stack (run_iccs) and Multi-Channel Cross-Correlation (run_mccc) algorithms; update picks, time windows, and correlation thresholds.
  • Snapshots — save, restore, and delete parameter snapshots (create_snapshot, rollback_to_snapshot).
  • Project — create and delete the project database (create_project, delete_project).

Classes:

Name Description
BoundICCS

An ICCS instance explicitly bound to a specific event.

Functions:

Name Description
add_data_to_project

Add data sources to the AIMBAT database.

build_iccs_from_snapshot

Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

clear_iccs_cache

Clear the process-level ICCS cache.

clear_mccc_quality

Clear MCCC quality metrics from the live quality tables for an event.

compute_parameters_hash

Compute a deterministic SHA-256 hash of the event's current parameters.

create_iccs_instance

Return a BoundICCS instance for the given event.

create_project

Initializes a new AIMBAT project database schema and triggers.

create_snapshot

Create a snapshot of the AIMBAT processing parameters and quality metrics.

delete_event

Delete an AimbatEvent from the database.

delete_project

Delete the AIMBAT project.

delete_seismogram

Delete an AimbatSeismogram from the database.

delete_snapshot

Delete an AIMBAT parameter snapshot.

delete_station

Delete an AimbatStation from the database.

dump_data_table

Return AIMBAT datasources table as a JSON-serialisable list of dicts.

dump_event_parameter_snapshot_table

Dump event parameter snapshots as a list of dicts.

dump_event_parameter_table

Dump the event parameter table data to json.

dump_event_quality_snapshot_table

Dump event quality snapshots as a list of dicts.

dump_event_quality_table

Dump event quality statistics to json.

dump_event_table

Dump the table data to json serialisable list of dicts.

dump_seismogram_parameter_snapshot_table

Dump seismogram parameter snapshots as a list of dicts.

dump_seismogram_parameter_table

Dump the seismogram parameter table data to json serialisable list of dicts.

dump_seismogram_quality_snapshot_table

Dump seismogram quality snapshots as a list of dicts.

dump_seismogram_table

Dump the AimbatSeismogram table to json serialisable list of dicts.

dump_snapshot_quality_table

Dump snapshot quality statistics to json.

dump_snapshot_results

Dump per-seismogram MCCC results from a snapshot as a results envelope.

dump_snapshot_table

Dump snapshot metadata as a list of dicts.

dump_station_quality_table

Dump station quality statistics to json.

dump_station_table

Create a JSON serialisable dict from the AimbatStation table data.

get_completed_events

Get the events marked as completed.

get_data_for_event

Returns the data sources belonging to the given event.

get_event_quality

Get aggregated quality statistics for an event.

get_events_using_station

Get all events that use a particular station.

get_note_content

Return the note content for the given entity.

get_selected_seismograms

Get the selected seismograms for the given event.

get_snapshot_quality

Get aggregated quality statistics for a snapshot.

get_snapshots

Get the snapshots, optional filtered by event ID.

get_station_iccs_ccs

Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

get_station_quality

Get aggregated quality statistics for a station.

get_stations_in_event

Get the stations for a particular event.

reset_seismogram_parameters

Reset an AimbatSeismogram's parameters to their default values.

resolve_event

Resolve an event from an explicit ID.

rollback_to_snapshot

Rollback to an AIMBAT parameters snapshot.

run_iccs

Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

run_mccc

Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

save_note

Save note content for the given entity, creating the note record if needed.

set_event_parameter

Set event parameter value for the given event.

set_seismogram_parameter

Set parameter value for an AimbatSeismogram instance.

sync_from_matching_hash

Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

sync_iccs_parameters

Sync an existing ICCS instance's parameters from the database.

validate_iccs_construction

Try to construct an ICCS instance for the event without caching the result.

BoundICCS dataclass

An ICCS instance explicitly bound to a specific event.

Use is_stale to detect whether the event's parameters have been modified (e.g. by a CLI command) since this instance was created.

Parameters:

Name Type Description Default
iccs ICCS
required
event_id UUID
required
created_at Timestamp
required

Methods:

Name Description
is_stale

Return True if the event has been modified since this ICCS was created.

Source code in src/aimbat/core/_iccs.py
@dataclass
class BoundICCS:
    """An ICCS instance explicitly bound to a specific event.

    Use `is_stale` to detect whether the event's parameters have been modified
    (e.g. by a CLI command) since this instance was created.
    """

    iccs: ICCS
    event_id: UUID
    created_at: Timestamp

    def is_stale(self, event: AimbatEvent) -> bool:
        """Return True if the event has been modified since this ICCS was created.

        Args:
            event: The event to check against.
        """
        if event.id != self.event_id:
            return True
        if event.last_modified is None:
            return False
        return event.last_modified > self.created_at

is_stale

is_stale(event: AimbatEvent) -> bool

Return True if the event has been modified since this ICCS was created.

Parameters:

Name Type Description Default
event AimbatEvent

The event to check against.

required
Source code in src/aimbat/core/_iccs.py
def is_stale(self, event: AimbatEvent) -> bool:
    """Return True if the event has been modified since this ICCS was created.

    Args:
        event: The event to check against.
    """
    if event.id != self.event_id:
        return True
    if event.last_modified is None:
        return False
    return event.last_modified > self.created_at

add_data_to_project

add_data_to_project(
    session: Session,
    data_sources: Sequence[PathLike | str],
    data_type: DataType,
    station_id: UUID | None = None,
    event_id: UUID | None = None,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> (
    tuple[
        list[AimbatDataSource],
        set[UUID],
        set[UUID],
        set[UUID],
    ]
    | None
)

Add data sources to the AIMBAT database.

What gets created depends on which capabilities data_type supports:

  • Station + event + seismogram: all three records are created and linked, and an AimbatDataSource entry is stored.
  • Station or event only (e.g. JSON_STATION, JSON_EVENT): only the relevant metadata records are created; no seismogram or data source entry is stored.

Use station_id or event_id to skip extracting station or event metadata from the data source and link to a pre-existing record instead.

Parameters:

Name Type Description Default
session Session

The SQLModel database session.

required
data_sources Sequence[PathLike | str]

List of data sources to add.

required
data_type DataType

Type of data.

required
station_id UUID | None

UUID of an existing station to use instead of extracting one from each data source.

None
event_id UUID | None

UUID of an existing event to use instead of extracting one from each data source.

None
dry_run bool

If True, do not commit changes to the database.

False
disable_progress_bar bool

Do not display progress bar.

True
Source code in src/aimbat/core/_data.py
def add_data_to_project(
    session: Session,
    data_sources: Sequence[os.PathLike | str],
    data_type: DataType,
    station_id: UUID | None = None,
    event_id: UUID | None = None,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> tuple[list[AimbatDataSource], set[UUID], set[UUID], set[UUID]] | None:
    """Add data sources to the AIMBAT database.

    What gets created depends on which capabilities `data_type` supports:

    - Station + event + seismogram: all three records are created and linked,
      and an `AimbatDataSource` entry is stored.
    - Station or event only (e.g. `JSON_STATION`, `JSON_EVENT`): only the
      relevant metadata records are created; no seismogram or data source entry
      is stored.

    Use `station_id` or `event_id` to skip extracting station or event metadata
    from the data source and link to a pre-existing record instead.

    Args:
        session: The SQLModel database session.
        data_sources: List of data sources to add.
        data_type: Type of data.
        station_id: UUID of an existing station to use instead of extracting
            one from each data source.
        event_id: UUID of an existing event to use instead of extracting one
            from each data source.
        dry_run: If True, do not commit changes to the database.
        disable_progress_bar: Do not display progress bar.
    """

    logger.info(f"Adding {len(data_sources)} {data_type} data sources to project.")

    if station_id is not None and session.get(AimbatStation, station_id) is None:
        raise NoResultFound(f"No station found with ID {station_id}.")
    if event_id is not None and session.get(AimbatEvent, event_id) is None:
        raise NoResultFound(f"No event found with ID {event_id}.")

    # Snapshot existing IDs before entering the savepoint so we can identify
    # what would be new vs reused when running a dry run.
    if dry_run:
        existing_station_ids = set(session.exec(select(AimbatStation.id)).all())
        existing_event_ids = set(session.exec(select(AimbatEvent.id)).all())
        existing_seismogram_ids = set(session.exec(select(AimbatSeismogram.id)).all())

    try:
        added_datasources: list[AimbatDataSource] = []
        with session.begin_nested() as nested:
            for datasource in track(
                sequence=data_sources,
                description="Adding data ...",
                disable=disable_progress_bar,
            ):
                result = _process_datasource(
                    session, datasource, data_type, station_id, event_id
                )
                if result is not None:
                    added_datasources.append(result)

            if dry_run:
                logger.info("Dry run: displaying data that would be added.")
                if added_datasources:
                    session.flush()
                nested.rollback()
                logger.info("Dry run complete. Rolling back changes.")
                return (
                    added_datasources,
                    existing_station_ids,
                    existing_event_ids,
                    existing_seismogram_ids,
                )

        session.commit()
        logger.info("Data added successfully.")
        return None

    except Exception as e:
        logger.error(f"Failed to add data. Rolling back changes. Error: {e}")
        raise

build_iccs_from_snapshot

build_iccs_from_snapshot(
    session: Session, snapshot_id: UUID
) -> BoundICCS

Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

Uses the snapshot's event and seismogram parameters (window, t1, flip, select, bandpass, etc.) but reads waveform data from the live datasources. Seismograms added after the snapshot was taken are not included in the snapshot — their live parameters are used instead. No DB writes occur at any point.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

ID of the AimbatSnapshot to load.

required

Returns:

Type Description
BoundICCS

BoundICCS instance built from the snapshot parameters.

Raises:

Type Description
ValueError

If no snapshot with the given ID is found.

Source code in src/aimbat/core/_iccs.py
def build_iccs_from_snapshot(session: Session, snapshot_id: UUID) -> BoundICCS:
    """Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

    Uses the snapshot's event and seismogram parameters (window, t1, flip, select,
    bandpass, etc.) but reads waveform data from the live datasources. Seismograms
    added after the snapshot was taken are not included in the snapshot — their live
    parameters are used instead. No DB writes occur at any point.

    Args:
        session: Database session.
        snapshot_id: ID of the AimbatSnapshot to load.

    Returns:
        BoundICCS instance built from the snapshot parameters.

    Raises:
        ValueError: If no snapshot with the given ID is found.
    """
    logger.info(f"Building ICCS from snapshot {snapshot_id}.")

    statement = (
        select(AimbatSnapshot)
        .where(AimbatSnapshot.id == snapshot_id)
        .options(
            selectinload(rel(AimbatSnapshot.event))
            .selectinload(rel(AimbatEvent.seismograms))
            .selectinload(rel(AimbatSeismogram.parameters)),
            selectinload(rel(AimbatSnapshot.event_parameters_snapshot)),
            selectinload(rel(AimbatSnapshot.seismogram_parameters_snapshots)),
        )
    )
    snapshot = session.exec(statement).one_or_none()

    if snapshot is None:
        raise ValueError(f"Snapshot {snapshot_id} not found.")

    ep = snapshot.event_parameters_snapshot
    snap_params = AimbatEventParametersBase.model_validate(ep)

    # Build a map from seismogram_parameters_id → snapshot parameters
    snap_seis_map = {
        sp.seismogram_parameters_id: sp
        for sp in snapshot.seismogram_parameters_snapshots
    }

    seismograms = []
    for seis in snapshot.event.seismograms:
        snap_sp = snap_seis_map.get(seis.parameters.id)
        if snap_sp is None:
            # Seismogram was added after the snapshot — use live parameters
            seis_params = AimbatSeismogramParametersBase.model_validate(seis.parameters)
        else:
            seis_params = AimbatSeismogramParametersBase.model_validate(snap_sp)
        seismograms.append(
            MiniIccsSeismogram(
                begin_time=seis.begin_time,
                delta=seis.delta,
                data=seis.data,
                t0=seis.t0,
                t1=seis_params.t1,
                flip=seis_params.flip,
                select=seis_params.select,
                extra={"id": seis.id},
            )
        )

    iccs = ICCS(
        seismograms=seismograms,
        window_pre=snap_params.window_pre,
        window_post=snap_params.window_post,
        ramp_width=snap_params.ramp_width,
        bandpass_apply=snap_params.bandpass_apply,
        bandpass_fmin=snap_params.bandpass_fmin,
        bandpass_fmax=snap_params.bandpass_fmax,
        min_cc=snap_params.min_cc,
        context_width=settings.context_width,
    )
    return BoundICCS(
        iccs=iccs,
        event_id=snapshot.event_id,
        created_at=Timestamp.now("UTC"),
    )

clear_iccs_cache

clear_iccs_cache() -> None

Clear the process-level ICCS cache.

Source code in src/aimbat/core/_iccs.py
def clear_iccs_cache() -> None:
    """Clear the process-level ICCS cache."""
    _iccs_cache.clear()

clear_mccc_quality

clear_mccc_quality(
    session: Session, event: AimbatEvent
) -> None

Clear MCCC quality metrics from the live quality tables for an event.

Sets all MCCC fields (mccc_rmse, mccc_error, mccc_cc_mean, mccc_cc_std) to None for the event and all its seismograms. ICCS CC values are not affected.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent whose quality should be cleared.

required
Source code in src/aimbat/core/_iccs.py
def clear_mccc_quality(session: Session, event: AimbatEvent) -> None:
    """Clear MCCC quality metrics from the live quality tables for an event.

    Sets all MCCC fields (`mccc_rmse`, `mccc_error`, `mccc_cc_mean`,
    `mccc_cc_std`) to `None` for the event and all its seismograms.
    ICCS CC values are not affected.

    Args:
        session: Database session.
        event: AimbatEvent whose quality should be cleared.
    """
    logger.debug(f"Clearing MCCC quality for event {event.id}.")

    if event.quality is not None:
        event.quality.mccc_rmse = None
        session.add(event.quality)

    for seis in event.seismograms:
        if seis.quality is not None:
            seis.quality.mccc_error = None
            seis.quality.mccc_cc_mean = None
            seis.quality.mccc_cc_std = None
            session.add(seis.quality)

    session.commit()

compute_parameters_hash

compute_parameters_hash(event: AimbatEvent) -> str

Compute a deterministic SHA-256 hash of the event's current parameters.

Hashes the event ID, all event-level parameters, and per-seismogram parameters. Seismograms are sorted by ID so the result is independent of load order. Including the event ID means hashes are inherently event-scoped and will never collide across events.

Excluded fields:

  • completed (event): does not affect seismogram processing.
  • select (seismogram): determines which seismograms are passed to MCCC but does not affect the computation for any individual seismogram. Membership of the actual MCCC run is captured by the seismogram quality records in the snapshot, so changing selection state should not invalidate a prior MCCC result.

Parameters:

Name Type Description Default
event AimbatEvent

AimbatEvent whose current parameters should be hashed.

required

Returns:

Type Description
str

Hex-encoded SHA-256 digest.

Source code in src/aimbat/core/_snapshot.py
def compute_parameters_hash(event: AimbatEvent) -> str:
    """Compute a deterministic SHA-256 hash of the event's current parameters.

    Hashes the event ID, all event-level parameters, and per-seismogram
    parameters. Seismograms are sorted by ID so the result is independent of
    load order. Including the event ID means hashes are inherently
    event-scoped and will never collide across events.

    Excluded fields:

    - `completed` (event): does not affect seismogram processing.
    - `select` (seismogram): determines which seismograms are passed to MCCC
      but does not affect the computation for any individual seismogram.
      Membership of the actual MCCC run is captured by the seismogram quality
      records in the snapshot, so changing selection state should not
      invalidate a prior MCCC result.

    Args:
        event: AimbatEvent whose current parameters should be hashed.

    Returns:
        Hex-encoded SHA-256 digest.
    """
    logger.debug(f"Computing parameters hash for event {event.id}.")

    # exclude completed field since it does not affect the seismograms directly.
    event_data = AimbatEventParametersBase.model_validate(event.parameters).model_dump(
        mode="json", exclude={"completed"}
    )
    event_data["event_id"] = str(event.id)
    seis_data = sorted(
        [
            {
                "seismogram_id": str(seis.id),
                **AimbatSeismogramParametersBase.model_validate(
                    seis.parameters
                ).model_dump(mode="json", exclude={"select"}),
            }
            for seis in event.seismograms
        ],
        key=lambda x: x["seismogram_id"],
    )
    payload = json.dumps(
        {"event": event_data, "seismograms": seis_data}, sort_keys=True
    )
    return hashlib.sha256(payload.encode()).hexdigest()

create_iccs_instance

create_iccs_instance(
    session: Session, event: AimbatEvent
) -> BoundICCS

Return a BoundICCS instance for the given event.

Returns the cached instance when it is still fresh (i.e. event.last_modified has not advanced since the instance was created). Otherwise builds a new one and updates the cache. ICCS CC values are written to the live quality table in a separate session so the caller's session is not affected.

MiniIccsSeismogram instances are constructed directly from each AimbatSeismogram, passing data by reference to the read-only io cache. No waveform data is copied.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required

Returns:

Type Description
BoundICCS

BoundICCS instance tied to the given event.

Source code in src/aimbat/core/_iccs.py
def create_iccs_instance(session: Session, event: AimbatEvent) -> BoundICCS:
    """Return a BoundICCS instance for the given event.

    Returns the cached instance when it is still fresh (i.e. `event.last_modified`
    has not advanced since the instance was created). Otherwise builds a new one
    and updates the cache. ICCS CC values are written to the live quality table in
    a separate session so the caller's session is not affected.

    `MiniIccsSeismogram` instances are constructed directly from each
    `AimbatSeismogram`, passing `data` by reference to the read-only io cache.
    No waveform data is copied.

    Args:
        session: Database session.
        event: AimbatEvent.

    Returns:
        BoundICCS instance tied to the given event.

    """
    cached = _iccs_cache.get(event.id)
    if cached is not None and not cached.is_stale(event):
        logger.debug(f"Returning cached BoundICCS for event {event.id}.")
        return cached

    event = session.exec(
        select(AimbatEvent)
        .where(AimbatEvent.id == event.id)
        .options(
            selectinload(rel(AimbatEvent.parameters)),
            selectinload(rel(AimbatEvent.seismograms)).selectinload(
                rel(AimbatSeismogram.parameters)
            ),
        )
    ).one()

    logger.debug(f"Creating ICCS instance for event {event.id}.")
    bound = BoundICCS(
        iccs=_build_iccs(event),
        event_id=event.id,
        created_at=Timestamp.now("UTC"),
    )
    _iccs_cache[event.id] = bound
    _write_iccs_stats(event.id, bound.iccs)
    return bound

create_project

create_project(engine: Engine) -> None

Initializes a new AIMBAT project database schema and triggers.

Parameters:

Name Type Description Default
engine Engine

The SQLAlchemy/SQLModel Engine instance connected to the target database.

required

Raises:

Type Description
RuntimeError

If a project schema already exists in the target database.

Source code in src/aimbat/core/_project.py
def create_project(engine: Engine) -> None:
    """Initializes a new AIMBAT project database schema and triggers.

    Args:
        engine: The SQLAlchemy/SQLModel Engine instance connected to the target database.

    Raises:
        RuntimeError: If a project schema already exists in the target database.
    """

    # Import locally to ensure SQLModel registers all table metadata before create_all()
    import aimbat.models  # noqa: F401

    logger.info(f"Creating new project in {engine.url}.")

    if _project_exists(engine):
        raise RuntimeError(
            f"Unable to create a new project: project already exists at {engine.url}!"
        )

    logger.debug("Creating database tables and loading defaults.")

    SQLModel.metadata.create_all(engine)

    if engine.name == "sqlite":
        with engine.begin() as connection:
            # Trigger 1: Track last modification time when event parameters change
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS event_modified_on_params_update
                AFTER UPDATE ON aimbateventparameters
                BEGIN
                    UPDATE aimbatevent SET last_modified = strftime('%Y-%m-%d %H:%M:%f', 'now')
                    WHERE id = NEW.event_id;
                END;
            """)
            )

            # Trigger 2: Track last modification time when seismogram parameters change
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS event_modified_on_seis_params_update
                AFTER UPDATE ON aimbatseismogramparameters
                BEGIN
                    UPDATE aimbatevent
                    SET last_modified = strftime('%Y-%m-%d %H:%M:%f', 'now')
                    WHERE id = (
                        SELECT event_id FROM aimbatseismogram
                        WHERE id = NEW.seismogram_id
                    );
                END;
            """)
            )

            # Trigger 3: Null all quality when event window/bandpass/ramp parameters change.
            # These parameters change the signal data used by both ICCS and MCCC.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_all_quality_on_window_bandpass_change
                AFTER UPDATE ON aimbateventparameters
                WHEN (NEW.window_pre IS NOT OLD.window_pre)
                  OR (NEW.window_post IS NOT OLD.window_post)
                  OR (NEW.ramp_width IS NOT OLD.ramp_width)
                  OR (NEW.bandpass_apply IS NOT OLD.bandpass_apply)
                  OR (NEW.bandpass_fmin IS NOT OLD.bandpass_fmin)
                  OR (NEW.bandpass_fmax IS NOT OLD.bandpass_fmax)
                BEGIN
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE event_id = NEW.event_id;
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL, mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = NEW.event_id
                    );
                END;
            """)
            )

            # Trigger 4: Null MCCC quality when MCCC-specific event parameters change.
            # These parameters affect only the MCCC inversion, not the underlying signal,
            # so iccs_cc remains valid.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_mccc_quality_on_mccc_params_change
                AFTER UPDATE ON aimbateventparameters
                WHEN (NEW.mccc_damp IS NOT OLD.mccc_damp)
                  OR (NEW.mccc_min_cc IS NOT OLD.mccc_min_cc)
                BEGIN
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE event_id = NEW.event_id;
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = NEW.event_id
                    );
                END;
            """)
            )

            # Trigger 5a: Null quality when flip changes on a seismogram.
            # Flipping a trace only affects the ICCS stack if the seismogram is selected.
            # MCCC stats are invalidated if the seismogram was included in the last MCCC
            # run, which is inferred from the presence of live mccc_cc_mean stats —
            # not from select, because MCCC may have been run with --all.
            # The event-level UPDATE is ordered before the per-seismogram UPDATE so that
            # the EXISTS check sees the original (non-nulled) stats in both statements.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_flip_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW.flip IS NOT OLD.flip
                BEGIN
                    -- Null iccs_cc for all event seismograms if selected (stack changed),
                    -- or just locally if deselected (the flipped seismogram's own CC is stale
                    -- even though the stack is unchanged).
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE (
                        NEW."select" = TRUE
                        AND seismogram_id IN (
                            SELECT id FROM aimbatseismogram WHERE event_id = (
                                SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                            )
                        )
                    ) OR (
                        NEW."select" IS NOT TRUE
                        AND seismogram_id = NEW.seismogram_id
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run (checked before these stats are nulled above)
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

            # Trigger 5b: Null quality when t1 changes on a seismogram.
            # ICCS: if selected, the stack is affected so iccs_cc is stale for all;
            # if deselected, only this seismogram's own iccs_cc is stale.
            # MCCC: invalidated whenever the seismogram was in the last MCCC run,
            # inferred from live mccc_cc_mean — not select — because MCCC may have
            # been run with --all, meaning a deselected seismogram could still be included.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_t1_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW.t1 IS NOT OLD.t1
                BEGIN
                    -- Null iccs_cc for all event seismograms if selected (stack changed),
                    -- otherwise only null locally.
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE (
                        NEW."select" = TRUE
                        AND seismogram_id IN (
                            SELECT id FROM aimbatseismogram WHERE event_id = (
                                SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                            )
                        )
                    ) OR (
                        NEW."select" IS NOT TRUE
                        AND seismogram_id = NEW.seismogram_id
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

            # Trigger 5c: Null quality when select changes on a seismogram.
            # ICCS stack composition changes in both directions (select → deselect and
            # vice versa), so iccs_cc is always invalidated for the whole event.
            # MCCC stats are only invalidated if the seismogram was in the last MCCC run,
            # inferred from live mccc_cc_mean — if MCCC was run with --all, changing
            # select does not change the MCCC set, so live stats remain valid.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_select_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW."select" IS NOT OLD."select"
                BEGIN
                    -- Always null iccs_cc for the whole event (stack composition changed)
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

create_snapshot

create_snapshot(
    session: Session,
    event: AimbatEvent,
    comment: str | None = None,
) -> None

Create a snapshot of the AIMBAT processing parameters and quality metrics.

Parameter snapshots are always created. Quality snapshots are created whenever the corresponding live quality record has at least one non-None field. Seismogram quality is omitted when all quality fields are None (e.g. before any ICCS or MCCC run).

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
comment str | None

Optional comment.

None
Source code in src/aimbat/core/_snapshot.py
def create_snapshot(
    session: Session,
    event: AimbatEvent,
    comment: str | None = None,
) -> None:
    """Create a snapshot of the AIMBAT processing parameters and quality metrics.

    Parameter snapshots are always created. Quality snapshots are created
    whenever the corresponding live quality record has at least one non-None
    field. Seismogram quality is omitted when all quality fields are `None`
    (e.g. before any ICCS or MCCC run).

    Args:
        session: Database session.
        event: AimbatEvent.
        comment: Optional comment.
    """

    logger.info(
        f"Creating snapshot for event {event.id}"
        + (f" with comment '{comment}'" if comment else "")
        + "."
    )

    event = session.exec(
        select(AimbatEvent)
        .where(AimbatEvent.id == event.id)
        .options(
            selectinload(rel(AimbatEvent.parameters)),
            selectinload(rel(AimbatEvent.quality)),
            selectinload(rel(AimbatEvent.seismograms)).options(
                selectinload(rel(AimbatSeismogram.parameters)),
                selectinload(rel(AimbatSeismogram.quality)),
            ),
        )
    ).one()

    event_parameters_snapshot = AimbatEventParametersSnapshot.model_validate(
        event.parameters,
        update={
            "id": uuid4(),  # we don't want to carry over the id from the input event parameters
            "parameters_id": event.parameters.id,
        },
    )
    logger.debug(
        f"Adding event parameters snapshot with id={event_parameters_snapshot.id} to snapshot."
    )

    seismogram_parameter_snapshots = []
    for aimbat_seismogram in event.seismograms:
        seismogram_parameter_snapshot = AimbatSeismogramParametersSnapshot.model_validate(
            aimbat_seismogram.parameters,
            update={
                "id": uuid4(),  # we don't want to carry over the id from the input seismogram parameters
                "seismogram_parameters_id": aimbat_seismogram.parameters.id,
            },
        )
        logger.debug(
            f"Adding seismogram parameters snapshot with id={seismogram_parameter_snapshot.id} to snapshot."
        )
        seismogram_parameter_snapshots.append(seismogram_parameter_snapshot)

    # Capture quality metrics from the live quality tables.
    event_quality_snap: AimbatEventQualitySnapshot | None = None
    seis_quality_snaps: list[AimbatSeismogramQualitySnapshot] = []

    if event.quality is not None and event.quality.mccc_rmse is not None:
        logger.debug("Capturing event quality snapshot from live quality table.")
        event_quality_snap = AimbatEventQualitySnapshot.model_validate(
            event.quality,
            update={
                "id": uuid4(),
                "event_quality_id": event.quality.id,
            },
        )

    for aimbat_seismogram in event.seismograms:
        sq = aimbat_seismogram.quality
        if sq is None:
            continue
        if any(
            v is not None
            for v in [sq.iccs_cc, sq.mccc_cc_mean, sq.mccc_cc_std, sq.mccc_error]
        ):
            logger.debug(
                f"Adding seismogram quality snapshot for seismogram {aimbat_seismogram.id}."
            )
            seis_quality_snaps.append(
                AimbatSeismogramQualitySnapshot.model_validate(
                    sq,
                    update={
                        "id": uuid4(),
                        "seismogram_quality_id": sq.id,
                    },
                )
            )

    aimbat_snapshot = AimbatSnapshot(
        event=event,
        event_parameters_snapshot=event_parameters_snapshot,
        seismogram_parameters_snapshots=seismogram_parameter_snapshots,
        event_quality_snapshot=event_quality_snap,
        seismogram_quality_snapshots=seis_quality_snaps,
        comment=comment,
        parameters_hash=compute_parameters_hash(event),
    )
    session.add(aimbat_snapshot)
    session.commit()

delete_event

delete_event(session: Session, event_id: UUID) -> None

Delete an AimbatEvent from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event ID.

required
Source code in src/aimbat/core/_event.py
def delete_event(session: Session, event_id: UUID) -> None:
    """Delete an AimbatEvent from the database.

    Args:
        session: Database session.
        event_id: Event ID.

    """

    logger.info(f"Deleting event {event_id}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(f"Unable to find event using id: {event_id}.")

    session.delete(event)
    session.commit()

delete_project

delete_project(engine: Engine) -> None

Delete the AIMBAT project.

Raises:

Type Description
RuntimeError

If unable to delete project.

Source code in src/aimbat/core/_project.py
def delete_project(engine: Engine) -> None:
    """Delete the AIMBAT project.

    Raises:
        RuntimeError: If unable to delete project.
    """

    logger.info(f"Deleting project at {engine.url}.")

    if not _project_exists(engine):
        raise RuntimeError("No project found to delete.")

    if engine.driver == "pysqlite":
        database = engine.url.database
        engine.dispose()
        if database == ":memory:":
            logger.info("Running database in memory, nothing to delete.")
            return
        elif database:
            project_path = Path(database)
            logger.info(f"Deleting project file: {project_path}.")
            project_path.unlink()
            return

    raise RuntimeError(
        f"Unable to delete project: unsupported engine driver '{engine.driver}'."
    )

delete_seismogram

delete_seismogram(
    session: Session, seismogram_id: UUID
) -> None

Delete an AimbatSeismogram from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram ID.

required
Source code in src/aimbat/core/_seismogram.py
def delete_seismogram(session: Session, seismogram_id: UUID) -> None:
    """Delete an AimbatSeismogram from the database.

    Args:
        session: Database session.
        seismogram_id: Seismogram ID.

    """

    logger.info(f"Deleting seismogram {seismogram_id}.")

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        raise NoResultFound(f"No AimbatSeismogram found with {seismogram_id=}")

    session.delete(seismogram)
    session.commit()

delete_snapshot

delete_snapshot(
    session: Session, snapshot_id: UUID
) -> None

Delete an AIMBAT parameter snapshot.

Parameters:

Name Type Description Default
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def delete_snapshot(session: Session, snapshot_id: UUID) -> None:
    """Delete an AIMBAT parameter snapshot.

    Args:
        snapshot_id: Snapshot id.
    """
    logger.info(f"Deleting snapshot {snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)
    if snapshot is None:
        raise NoResultFound(f"Unable to find snapshot with {snapshot_id=}")

    session.delete(snapshot)
    session.commit()

delete_station

delete_station(session: Session, station_id: UUID) -> None

Delete an AimbatStation from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

ID of the station to delete.

required
Source code in src/aimbat/core/_station.py
def delete_station(session: Session, station_id: UUID) -> None:
    """Delete an AimbatStation from the database.

    Args:
        session: Database session.
        station_id: ID of the station to delete.
    """

    logger.info(f"Deleting station with id={station_id}.")

    station = session.get(AimbatStation, station_id)
    if station is None:
        raise NoResultFound(f"No AimbatStation found with {station_id=}")

    session.delete(station)
    session.commit()

dump_data_table

dump_data_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Return AIMBAT datasources table as a JSON-serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

UUID of the event to filter data sources by. If None, all data sources are returned.

None
by_alias bool

Whether to use field aliases.

False
by_title bool

Whether to use field titles (from the Pydantic model) for the field names in the output. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None

Returns:

Type Description
list[dict[str, Any]]

Aimbat datasources table as a list of dicts.

Source code in src/aimbat/core/_data.py
def dump_data_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Return AIMBAT datasources table as a JSON-serialisable list of dicts.

    Args:
        session: Database session.
        event_id: UUID of the event to filter data sources by. If None, all data sources are returned.
        by_alias: Whether to use field aliases.
        by_title: Whether to use field titles (from the Pydantic model) for the
            field names in the output. Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.

    Returns:
        Aimbat datasources table as a list of dicts.
    """
    logger.debug("Dumping AIMBAT datasources table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatDataSource]] = TypeAdapter(
        Sequence[AimbatDataSource]
    )

    if event_id is not None:
        data_source = get_data_for_event(session, event_id)
    else:
        data_source = session.exec(select(AimbatDataSource)).all()

    data = adapter.dump_python(
        data_source, exclude=exclude, by_alias=by_alias, mode="json"
    )

    if by_title:
        title_map = get_title_map(AimbatDataSource)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_event_parameter_snapshot_table

dump_event_parameter_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Dump event parameter snapshots as a list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are dumped).

None
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
Source code in src/aimbat/core/_snapshot.py
def dump_event_parameter_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Dump event parameter snapshots as a list of dicts.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided,
            snapshots for all events are dumped).
        by_alias: Whether to use serialization aliases for the field names in the output.
        exclude: Set of field names to exclude from the output.
    """
    logger.debug("Dumping AimbatEventParametersSnapshot table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    event_params_adapter: TypeAdapter[Sequence[AimbatEventParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatEventParametersSnapshot])
    )
    event_snaps = [s.event_parameters_snapshot for s in snapshots]
    event_dicts = event_params_adapter.dump_python(
        event_snaps, mode="json", by_alias=by_alias, exclude=exclude
    )

    return event_dicts

dump_event_parameter_table

dump_event_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the event parameter table data to json.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter parameters by (if none is provided, parameters for all events are dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_event.py
def dump_event_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the event parameter table data to json.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names.
            Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter parameters by (if none is provided,
            parameters for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """

    logger.debug("Dumping AIMBAT event parameter table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatEventParameters]] = TypeAdapter(
        Sequence[AimbatEventParameters]
    )

    if event_id is not None:
        statement = select(AimbatEventParameters).where(
            AimbatEventParameters.event_id == event_id
        )
    else:
        statement = select(AimbatEventParameters)

    parameters = session.exec(statement).all()

    data = adapter.dump_python(
        parameters, mode="json", exclude=exclude, by_alias=by_alias
    )

    if by_title:
        title_map = get_title_map(AimbatEventParameters)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_event_quality_snapshot_table

dump_event_quality_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Dump event quality snapshots as a list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are dumped).

None
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
Source code in src/aimbat/core/_snapshot.py
def dump_event_quality_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Dump event quality snapshots as a list of dicts.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided,
            snapshots for all events are dumped).
        by_alias: Whether to use serialization aliases for the field names in the output.
        exclude: Set of field names to exclude from the output.
    """
    logger.debug("Dumping AimbatEventQualitySnapshot table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    event_quality_adapter: TypeAdapter[Sequence[AimbatEventQualitySnapshot]] = (
        TypeAdapter(Sequence[AimbatEventQualitySnapshot])
    )
    # Filter out snapshots that don't have event quality records.
    event_quality_snaps = [
        s.event_quality_snapshot
        for s in snapshots
        if s.event_quality_snapshot is not None
    ]
    event_quality_dicts = event_quality_adapter.dump_python(
        event_quality_snaps, mode="json", by_alias=by_alias, exclude=exclude
    )

    return event_quality_dicts

dump_event_quality_table

dump_event_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump event quality statistics to json.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter by (if none is provided, quality for all events is dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_event.py
def dump_event_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump event quality statistics to json.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names.
            Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter by (if none is provided, quality for all
            events is dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """

    logger.debug("Dumping AIMBAT event quality table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    exclude = (exclude or set()) | {"station_id", "snapshot_id"}
    exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    statement = select(AimbatEvent).options(
        selectinload(rel(AimbatEvent.seismograms)).selectinload(
            rel(AimbatSeismogram.quality)
        ),
        selectinload(rel(AimbatEvent.quality)),
    )
    if event_id is not None:
        statement = statement.where(AimbatEvent.id == event_id)

    events = session.exec(statement).all()
    stats = [SeismogramQualityStats.from_event(e) for e in events]

    adapter: TypeAdapter[Sequence[SeismogramQualityStats]] = TypeAdapter(
        Sequence[SeismogramQualityStats]
    )
    data = adapter.dump_python(stats, mode="json", exclude=exclude, by_alias=by_alias)

    if by_title:
        title_map = get_title_map(SeismogramQualityStats)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_event_table

dump_event_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]] | str

Dump the table data to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model.

False
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_event.py
def dump_event_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]] | str:
    """Dump the table data to json serialisable list of dicts.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names in the
            output (only applicable when from_read_model is True). Mutually
            exclusive with by_alias.
        exclude: Set of field names to exclude from the output.

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """
    logger.debug("Dumping AIMBAT event table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    statement = select(AimbatEvent).options(
        selectinload(rel(AimbatEvent.seismograms)).selectinload(
            rel(AimbatSeismogram.parameters)
        ),
        selectinload(rel(AimbatEvent.parameters)),
        selectinload(rel(AimbatEvent.quality)),
    )
    events = session.exec(statement).all()

    if from_read_model:
        event_reads = [AimbatEventRead.from_event(e, session=session) for e in events]
        adapter_reads: TypeAdapter[Sequence[AimbatEventRead]] = TypeAdapter(
            Sequence[AimbatEventRead]
        )
        data = adapter_reads.dump_python(
            event_reads, exclude=exclude, by_alias=by_alias, mode="json"
        )

        if by_title:
            title_map = get_title_map(AimbatEventRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatEvent]] = TypeAdapter(Sequence[AimbatEvent])
    return adapter.dump_json(events, exclude=exclude, by_alias=by_alias).decode()

dump_seismogram_parameter_snapshot_table

dump_seismogram_parameter_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Dump seismogram parameter snapshots as a list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are dumped).

None
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
Source code in src/aimbat/core/_snapshot.py
def dump_seismogram_parameter_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Dump seismogram parameter snapshots as a list of dicts.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided,
            snapshots for all events are dumped).
        by_alias: Whether to use serialization aliases for the field names in the output.
        exclude: Set of field names to exclude from the output.
    """
    logger.debug("Dumping AimbatSeismogramParametersSnapshot table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    seis_params_adapter: TypeAdapter[Sequence[AimbatSeismogramParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatSeismogramParametersSnapshot])
    )
    seis_snaps = [sp for s in snapshots for sp in s.seismogram_parameters_snapshots]
    seis_dicts = seis_params_adapter.dump_python(
        seis_snaps, mode="json", by_alias=by_alias, exclude=exclude
    )

    return seis_dicts

dump_seismogram_parameter_table

dump_seismogram_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the seismogram parameter table data to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismogram parameters by (if none is provided, all seismogram parameters for all events are dumped).

None

Returns:

Type Description
list[dict[str, Any]]

list of dicts representing the seismogram parameters.

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the seismogram parameter table data to json serialisable list of dicts.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismogram parameters by (if none is provided,
            all seismogram parameters for all events are dumped).

    Returns:
        list of dicts representing the seismogram parameters.

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """
    logger.debug("Dumping AimbatSeismogramParameters table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatSeismogramParameters]] = TypeAdapter(
        Sequence[AimbatSeismogramParameters]
    )

    if event_id is not None:
        statement = (
            select(AimbatSeismogramParameters)
            .join(AimbatSeismogram)
            .where(AimbatSeismogram.event_id == event_id)
        )
    else:
        statement = select(AimbatSeismogramParameters)

    parameters = session.exec(statement).all()

    data = adapter.dump_python(
        parameters, mode="json", exclude=exclude, by_alias=by_alias
    )

    if by_title:
        title_map = get_title_map(AimbatSeismogramParameters)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_seismogram_quality_snapshot_table

dump_seismogram_quality_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Dump seismogram quality snapshots as a list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are dumped).

None
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
Source code in src/aimbat/core/_snapshot.py
def dump_seismogram_quality_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Dump seismogram quality snapshots as a list of dicts.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided,
            snapshots for all events are dumped).
        by_alias: Whether to use serialization aliases for the field names in the output.
        exclude: Set of field names to exclude from the output.
    """
    logger.debug("Dumping AimbatSeismogramQualitySnapshot table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    seis_quality_adapter: TypeAdapter[Sequence[AimbatSeismogramQualitySnapshot]] = (
        TypeAdapter(Sequence[AimbatSeismogramQualitySnapshot])
    )
    # Collect all seismogram quality records from all snapshots.
    seis_quality_snaps = [
        sq for s in snapshots for sq in s.seismogram_quality_snapshots
    ]
    seis_quality_dicts = seis_quality_adapter.dump_python(
        seis_quality_snaps, mode="json", by_alias=by_alias, exclude=exclude
    )

    return seis_quality_dicts

dump_seismogram_table

dump_seismogram_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the AimbatSeismogram table to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model.

False
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismograms by (if none is provided, seismograms for all events are dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the AimbatSeismogram table to json serialisable list of dicts.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismograms by (if none is provided,
            seismograms for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """
    logger.debug("Dumping AIMBAT seismogram table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    if event_id is not None:
        statement = select(AimbatSeismogram).where(
            AimbatSeismogram.event_id == event_id
        )
    else:
        statement = select(AimbatSeismogram)

    statement = statement.options(
        selectinload(rel(AimbatSeismogram.station)),
        selectinload(rel(AimbatSeismogram.event)),
        selectinload(rel(AimbatSeismogram.parameters)),
        selectinload(rel(AimbatSeismogram.quality)),
    )

    seismograms = session.exec(statement).all()

    if from_read_model:
        seismogram_reads = [
            AimbatSeismogramRead.from_seismogram(s, session=session)
            for s in seismograms
        ]
        adapter_reads: TypeAdapter[Sequence[AimbatSeismogramRead]] = TypeAdapter(
            Sequence[AimbatSeismogramRead]
        )
        data = adapter_reads.dump_python(
            seismogram_reads, mode="json", exclude=exclude, by_alias=by_alias
        )

        if by_title:
            title_map = get_title_map(AimbatSeismogramRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatSeismogram]] = TypeAdapter(
        Sequence[AimbatSeismogram]
    )

    return adapter.dump_python(
        seismograms, mode="json", exclude=exclude, by_alias=by_alias
    )

dump_snapshot_quality_table

dump_snapshot_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump snapshot quality statistics to json.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter snapshots by (if none is provided, quality for all snapshots is dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_snapshot.py
def dump_snapshot_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump snapshot quality statistics to json.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names.
            Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter snapshots by (if none is provided, quality
            for all snapshots is dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """

    logger.debug("Dumping AIMBAT snapshot quality table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    exclude = (exclude or set()) | {"station_id"}
    exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)
    stats = [SeismogramQualityStats.from_snapshot(s) for s in snapshots]

    adapter: TypeAdapter[Sequence[SeismogramQualityStats]] = TypeAdapter(
        Sequence[SeismogramQualityStats]
    )
    data = adapter.dump_python(stats, mode="json", exclude=exclude, by_alias=by_alias)

    if by_title:
        title_map = get_title_map(SeismogramQualityStats)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_snapshot_results

dump_snapshot_results(
    session: Session,
    snapshot_id: UUID,
    by_alias: bool = False,
) -> dict[str, Any]

Dump per-seismogram MCCC results from a snapshot as a results envelope.

Returns a dict with event- and snapshot-level header fields plus a seismograms list containing one entry per seismogram. Event-level scalars (snapshot_id, event_id, mccc_rmse) appear once in the envelope rather than being repeated on every row.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

UUID of the snapshot to export results from.

required
by_alias bool

Whether to use camelCase serialisation aliases for field names.

False

Returns:

Type Description
dict[str, Any]

Dict with header fields and a seismograms list.

Raises:

Type Description
NoResultFound

If no snapshot with the given ID is found.

Source code in src/aimbat/core/_snapshot.py
def dump_snapshot_results(
    session: Session,
    snapshot_id: UUID,
    by_alias: bool = False,
) -> dict[str, Any]:
    """Dump per-seismogram MCCC results from a snapshot as a results envelope.

    Returns a dict with event- and snapshot-level header fields plus a
    `seismograms` list containing one entry per seismogram. Event-level
    scalars (`snapshot_id`, `event_id`, `mccc_rmse`) appear once in the
    envelope rather than being repeated on every row.

    Args:
        session: Database session.
        snapshot_id: UUID of the snapshot to export results from.
        by_alias: Whether to use camelCase serialisation aliases for field names.

    Returns:
        Dict with header fields and a `seismograms` list.

    Raises:
        NoResultFound: If no snapshot with the given ID is found.
    """
    logger.debug(f"Dumping per-seismogram results for snapshot {snapshot_id}.")

    snapshot = session.exec(
        select(AimbatSnapshot)
        .where(AimbatSnapshot.id == snapshot_id)
        .options(
            selectinload(rel(AimbatSnapshot.event)),
            selectinload(rel(AimbatSnapshot.event_quality_snapshot)),
            selectinload(rel(AimbatSnapshot.seismogram_parameters_snapshots)).options(
                selectinload(
                    rel(AimbatSeismogramParametersSnapshot.parameters)
                ).options(
                    selectinload(
                        rel(AimbatSeismogramParameters.seismogram)
                    ).selectinload(rel(AimbatSeismogram.station))
                )
            ),
            selectinload(rel(AimbatSnapshot.seismogram_quality_snapshots)).selectinload(
                rel(AimbatSeismogramQualitySnapshot.quality)
            ),
        )
    ).one_or_none()

    if snapshot is None:
        raise NoResultFound(f"No AimbatSnapshot found with id: {snapshot_id}.")

    eq = snapshot.event_quality_snapshot
    mccc_rmse = eq.mccc_rmse if eq is not None else None

    # Build a lookup from seismogram_id → quality snapshot.
    quality_map: dict[UUID, AimbatSeismogramQualitySnapshot] = {
        sq.quality.seismogram_id: sq for sq in snapshot.seismogram_quality_snapshots
    }

    seismograms = [
        SnapshotSeismogramResult.from_snapshot_records(
            param_snap=ps,
            quality_snap=quality_map.get(ps.parameters.seismogram_id),
        )
        for ps in snapshot.seismogram_parameters_snapshots
    ]

    event = snapshot.event
    results = SnapshotResults(
        snapshot_id=snapshot.id,
        snapshot_time=snapshot.time,
        snapshot_comment=snapshot.comment,
        event_id=snapshot.event_id,
        event_time=event.time,
        event_latitude=event.latitude,
        event_longitude=event.longitude,
        event_depth_km=event.depth / 1000 if event.depth is not None else None,
        mccc_rmse=mccc_rmse,
        seismograms=seismograms,
    )

    return results.model_dump(mode="json", by_alias=by_alias)

dump_snapshot_table

dump_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Dump snapshot metadata as a list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are dumped).

None
from_read_model bool

Whether to dump from the read model (True) or the ORM model. Only affects the snapshots table.

False
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
Source code in src/aimbat/core/_snapshot.py
def dump_snapshot_table(
    session: Session,
    event_id: UUID | None = None,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Dump snapshot metadata as a list of dicts.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided,
            snapshots for all events are dumped).
        from_read_model: Whether to dump from the read model (True) or the ORM model.
            Only affects the `snapshots` table.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
    """
    logger.debug("Dumping AimbatSnapshot table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    if from_read_model:
        snapshot_read_adapter: TypeAdapter[Sequence[AimbatSnapshotRead]] = TypeAdapter(
            Sequence[AimbatSnapshotRead]
        )
        snapshots_read = [
            AimbatSnapshotRead.from_snapshot(s, session=session) for s in snapshots
        ]
        snapshot_dicts = snapshot_read_adapter.dump_python(
            snapshots_read, mode="json", by_alias=by_alias, exclude=exclude
        )

        if by_title:
            title_map = get_title_map(AimbatSnapshotRead)
            snapshot_dicts = [
                {title_map.get(k, k): v for k, v in row.items()}
                for row in snapshot_dicts
            ]
    else:
        snapshot_adapter: TypeAdapter[Sequence[AimbatSnapshot]] = TypeAdapter(
            Sequence[AimbatSnapshot]
        )
        snapshot_dicts = snapshot_adapter.dump_python(
            snapshots, mode="json", by_alias=by_alias, exclude=exclude
        )

    return snapshot_dicts

dump_station_quality_table

dump_station_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    station_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump station quality statistics to json.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
station_id UUID | None

Station ID to filter by (if none is provided, quality for all stations is dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_station.py
def dump_station_quality_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    station_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump station quality statistics to json.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names.
            Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        station_id: Station ID to filter by (if none is provided, quality for
            all stations is dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """

    logger.debug("Dumping AIMBAT station quality table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    exclude = (exclude or set()) | {"event_id", "snapshot_id"}
    exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    statement = select(AimbatStation).options(
        selectinload(rel(AimbatStation.seismograms)).selectinload(
            rel(AimbatSeismogram.quality)
        ),
    )
    if station_id is not None:
        statement = statement.where(AimbatStation.id == station_id)

    stations = session.exec(statement).all()
    stats = [SeismogramQualityStats.from_station(s) for s in stations]

    adapter: TypeAdapter[Sequence[SeismogramQualityStats]] = TypeAdapter(
        Sequence[SeismogramQualityStats]
    )
    data = adapter.dump_python(stats, mode="json", exclude=exclude, by_alias=by_alias)

    if by_title:
        title_map = get_title_map(SeismogramQualityStats)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_station_table

dump_station_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Create a JSON serialisable dict from the AimbatStation table data.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model.

False
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismograms by (if none is provided, seismograms for all events are dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_station.py
def dump_station_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Create a JSON serialisable dict from the AimbatStation table data.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismograms by (if none is provided,
            seismograms for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    logger.debug("Dumping AIMBAT station table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    if event_id is not None:
        statement = (
            select(AimbatStation)
            .join(AimbatSeismogram)
            .where(AimbatSeismogram.event_id == event_id)
            .distinct()
        )
    else:
        statement = select(AimbatStation)

    statement = statement.options(
        selectinload(rel(AimbatStation.seismograms)).selectinload(
            rel(AimbatSeismogram.quality)
        ),
        selectinload(rel(AimbatStation.seismograms)).selectinload(
            rel(AimbatSeismogram.parameters)
        ),
        selectinload(rel(AimbatStation.seismograms)).selectinload(
            rel(AimbatSeismogram.event)
        ),
    )

    stations = session.exec(statement).all()

    if from_read_model:
        read_stations = [
            AimbatStationRead.from_station(
                station=s,
                session=session,
            )
            for s in stations
        ]
        read_adapter: TypeAdapter[Sequence[AimbatStationRead]] = TypeAdapter(
            Sequence[AimbatStationRead]
        )
        data = read_adapter.dump_python(
            read_stations, exclude=exclude, by_alias=by_alias, mode="json"
        )

        if by_title:
            title_map = get_title_map(AimbatStationRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])
    return adapter.dump_python(
        stations, mode="json", by_alias=by_alias, exclude=exclude
    )

get_completed_events

get_completed_events(
    session: Session,
) -> Sequence[AimbatEvent]

Get the events marked as completed.

Parameters:

Name Type Description Default
session Session

SQL session.

required

Returns:

Type Description
Sequence[AimbatEvent]

All events where the completed parameter is set.

Source code in src/aimbat/core/_event.py
def get_completed_events(session: Session) -> Sequence[AimbatEvent]:
    """Get the events marked as completed.

    Args:
        session: SQL session.

    Returns:
        All events where the `completed` parameter is set.
    """

    logger.debug("Getting completed events from project.")

    statement = (
        select(AimbatEvent)
        .join(AimbatEventParameters)
        .where(col(AimbatEventParameters.completed).is_(True))
    )

    return session.exec(statement).all()

get_data_for_event

get_data_for_event(
    session: Session, event_id: UUID
) -> Sequence[AimbatDataSource]

Returns the data sources belonging to the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

UUID of the AimbatEvent.

required

Returns:

Type Description
Sequence[AimbatDataSource]

Sequence of AimbatDataSource objects belonging to the event.

Source code in src/aimbat/core/_data.py
def get_data_for_event(session: Session, event_id: UUID) -> Sequence[AimbatDataSource]:
    """Returns the data sources belonging to the given event.

    Args:
        session: Database session.
        event_id: UUID of the AimbatEvent.

    Returns:
        Sequence of AimbatDataSource objects belonging to the event.
    """

    logger.debug(f"Getting data sources for event {event_id}.")

    statement = (
        select(AimbatDataSource)
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.event_id == event_id)
    )
    return session.exec(statement).all()

get_event_quality

get_event_quality(
    session: Session, event_id: UUID
) -> SeismogramQualityStats

Get aggregated quality statistics for an event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

UUID of the event.

required

Returns:

Type Description
SeismogramQualityStats

Aggregated seismogram quality statistics. The mccc_rmse field is

SeismogramQualityStats

taken from the event-level quality record rather than the per-seismogram

SeismogramQualityStats

records, and is None if MCCC has not been run.

Raises:

Type Description
NoResultFound

If no event with the given ID is found.

Source code in src/aimbat/core/_event.py
def get_event_quality(session: Session, event_id: UUID) -> SeismogramQualityStats:
    """Get aggregated quality statistics for an event.

    Args:
        session: Database session.
        event_id: UUID of the event.

    Returns:
        Aggregated seismogram quality statistics. The `mccc_rmse` field is
        taken from the event-level quality record rather than the per-seismogram
        records, and is `None` if MCCC has not been run.

    Raises:
        NoResultFound: If no event with the given ID is found.
    """
    logger.debug(f"Getting quality stats for event {event_id}.")

    event = session.exec(
        select(AimbatEvent)
        .where(AimbatEvent.id == event_id)
        .options(
            selectinload(rel(AimbatEvent.seismograms)).selectinload(
                rel(AimbatSeismogram.quality)
            ),
            selectinload(rel(AimbatEvent.quality)),
        )
    ).one_or_none()

    if event is None:
        raise NoResultFound(f"No AimbatEvent found with id: {event_id}.")

    return SeismogramQualityStats.from_event(event)

get_events_using_station

get_events_using_station(
    session: Session, station_id: UUID
) -> Sequence[AimbatEvent]

Get all events that use a particular station.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

UUID of the station to return events for.

required

Returns: Events that use the station.

Source code in src/aimbat/core/_event.py
def get_events_using_station(
    session: Session, station_id: UUID
) -> Sequence[AimbatEvent]:
    """Get all events that use a particular station.

    Args:
        session: Database session.
        station_id: UUID of the station to return events for.

    Returns: Events that use the station.
    """

    logger.debug(f"Getting events for station: {station_id}.")

    statement = (
        select(AimbatEvent)
        .join(AimbatSeismogram)
        .join(AimbatStation)
        .where(AimbatStation.id == station_id)
        .options(
            selectinload(rel(AimbatEvent.seismograms)).selectinload(
                rel(AimbatSeismogram.parameters)
            ),
            selectinload(rel(AimbatEvent.parameters)),
            selectinload(rel(AimbatEvent.quality)),
        )
    )

    events = session.exec(statement).all()

    logger.debug(f"Found {len(events)}.")

    return events

get_note_content

get_note_content(
    session: Session, target: NoteTarget, target_id: UUID
) -> str

Return the note content for the given entity.

Parameters:

Name Type Description Default
session Session

Active database session.

required
target NoteTarget

Entity type — one of event, station, seismogram, snapshot.

required
target_id UUID

UUID of the target entity.

required

Returns:

Type Description
str

Markdown note content, or an empty string if no note exists yet.

Source code in src/aimbat/core/_note.py
def get_note_content(session: Session, target: NoteTarget, target_id: uuid.UUID) -> str:
    """Return the note content for the given entity.

    Args:
        session: Active database session.
        target: Entity type — one of `event`, `station`, `seismogram`, `snapshot`.
        target_id: UUID of the target entity.

    Returns:
        Markdown note content, or an empty string if no note exists yet.
    """
    attr = getattr(AimbatNote, f"{target}_id")
    note = session.exec(select(AimbatNote).where(attr == target_id)).first()
    return note.content if note is not None else ""

get_selected_seismograms

get_selected_seismograms(
    session: Session,
    event_id: UUID | None = None,
    all_events: bool = False,
) -> Sequence[AimbatSeismogram]

Get the selected seismograms for the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to get seismograms for (only used when all_events is False).

None
all_events bool

Get the selected seismograms for all events.

False

Returns: Selected seismograms.

Source code in src/aimbat/core/_seismogram.py
def get_selected_seismograms(
    session: Session, event_id: UUID | None = None, all_events: bool = False
) -> Sequence[AimbatSeismogram]:
    """Get the selected seismograms for the given event.

    Args:
        session: Database session.
        event_id: Event ID to get seismograms for (only used when all_events is False).
        all_events: Get the selected seismograms for all events.

    Returns: Selected seismograms.
    """

    if all_events is True:
        logger.debug("Selecting seismograms for all events.")
        statement = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .where(AimbatSeismogramParameters.select == 1)
        )
    else:
        if event_id is None:
            raise ValueError("An event must be provided when all_events is False.")
        logger.debug(f"Selecting seismograms for event {event_id} only.")
        statement = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .where(AimbatSeismogramParameters.select == 1)
            .where(AimbatSeismogram.event_id == event_id)
        )

    statement = statement.options(
        selectinload(rel(AimbatSeismogram.station)),
        selectinload(rel(AimbatSeismogram.event)),
        selectinload(rel(AimbatSeismogram.parameters)),
        selectinload(rel(AimbatSeismogram.quality)),
    )
    seismograms = session.exec(statement).all()

    logger.debug(f"Found {len(seismograms)} selected seismograms.")

    return seismograms

get_snapshot_quality

get_snapshot_quality(
    session: Session, snapshot_id: UUID
) -> SeismogramQualityStats

Get aggregated quality statistics for a snapshot.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

UUID of the snapshot.

required

Returns:

Type Description
SeismogramQualityStats

Aggregated seismogram quality statistics from the frozen snapshot records.

Raises:

Type Description
NoResultFound

If no snapshot with the given ID is found.

Source code in src/aimbat/core/_snapshot.py
def get_snapshot_quality(session: Session, snapshot_id: UUID) -> SeismogramQualityStats:
    """Get aggregated quality statistics for a snapshot.

    Args:
        session: Database session.
        snapshot_id: UUID of the snapshot.

    Returns:
        Aggregated seismogram quality statistics from the frozen snapshot records.

    Raises:
        NoResultFound: If no snapshot with the given ID is found.
    """
    logger.debug(f"Getting quality stats for snapshot {snapshot_id}.")

    snapshot = session.exec(
        select(AimbatSnapshot)
        .where(AimbatSnapshot.id == snapshot_id)
        .options(
            selectinload(rel(AimbatSnapshot.seismogram_quality_snapshots)),
            selectinload(rel(AimbatSnapshot.event_quality_snapshot)),
        )
    ).one_or_none()

    if snapshot is None:
        raise NoResultFound(f"No AimbatSnapshot found with id: {snapshot_id}.")

    return SeismogramQualityStats.from_snapshot(snapshot)

get_snapshots

get_snapshots(
    session: Session, event_id: UUID | None = None
) -> Sequence[AimbatSnapshot]

Get the snapshots, optional filtered by event ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are returned).

None

Returns: Snapshots.

Source code in src/aimbat/core/_snapshot.py
def get_snapshots(
    session: Session, event_id: UUID | None = None
) -> Sequence[AimbatSnapshot]:
    """Get the snapshots, optional filtered by event ID.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided, snapshots for all events are returned).

    Returns: Snapshots.
    """
    logger.debug("Getting AIMBAT snapshots.")

    if event_id is None:
        statement = select(AimbatSnapshot)
    else:
        statement = select(AimbatSnapshot).where(AimbatSnapshot.event_id == event_id)

    statement = statement.options(
        selectinload(rel(AimbatSnapshot.event)),
        selectinload(rel(AimbatSnapshot.event_parameters_snapshot)),
        selectinload(rel(AimbatSnapshot.seismogram_parameters_snapshots)),
        selectinload(rel(AimbatSnapshot.event_quality_snapshot)),
        selectinload(rel(AimbatSnapshot.seismogram_quality_snapshots)),
    )

    logger.debug(f"Executing statement to get snapshots: {statement}")
    return session.exec(statement).all()

get_station_iccs_ccs

get_station_iccs_ccs(
    session: Session, station_id: UUID
) -> tuple[float | None, ...]

Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

ID of the station.

required

Returns: Tuple of ICCS CC values, one per seismogram (None if not yet computed).

Source code in src/aimbat/core/_station.py
def get_station_iccs_ccs(
    session: Session, station_id: UUID
) -> tuple[float | None, ...]:
    """Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

    Args:
        session: Database session.
        station_id: ID of the station.

    Returns: Tuple of ICCS CC values, one per seismogram (None if not yet computed).
    """
    logger.debug(f"Getting ICCS CCs for {station_id=}.")

    statement = (
        select(AimbatSeismogramQuality.iccs_cc)
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.station_id == station_id)
    )

    return tuple(session.exec(statement).all())

get_station_quality

get_station_quality(
    session: Session, station_id: UUID
) -> SeismogramQualityStats

Get aggregated quality statistics for a station.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

UUID of the station.

required

Returns:

Type Description
SeismogramQualityStats

Aggregated seismogram quality statistics.

Raises:

Type Description
NoResultFound

If no station with the given ID is found.

Source code in src/aimbat/core/_station.py
def get_station_quality(session: Session, station_id: UUID) -> SeismogramQualityStats:
    """Get aggregated quality statistics for a station.

    Args:
        session: Database session.
        station_id: UUID of the station.

    Returns:
        Aggregated seismogram quality statistics.

    Raises:
        NoResultFound: If no station with the given ID is found.
    """
    logger.debug(f"Getting quality stats for station {station_id}.")

    station = session.exec(
        select(AimbatStation)
        .where(AimbatStation.id == station_id)
        .options(
            selectinload(rel(AimbatStation.seismograms)).selectinload(
                rel(AimbatSeismogram.quality)
            ),
        )
    ).one_or_none()

    if station is None:
        raise NoResultFound(f"No AimbatStation found with id: {station_id}.")

    return SeismogramQualityStats.from_station(station)

get_stations_in_event

get_stations_in_event(
    session: Session, event_id: UUID, as_json: bool = False
) -> Sequence[AimbatStation] | list[dict[str, Any]]

Get the stations for a particular event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

ID of the event to get stations for.

required
as_json bool

Whether to return the result as JSON.

False

Returns: Stations in event.

Source code in src/aimbat/core/_station.py
def get_stations_in_event(
    session: Session, event_id: UUID, as_json: bool = False
) -> Sequence[AimbatStation] | list[dict[str, Any]]:
    """Get the stations for a particular event.

    Args:
        session: Database session.
        event_id: ID of the event to get stations for.
        as_json: Whether to return the result as JSON.

    Returns: Stations in event.
    """
    logger.debug(f"Getting stations for event: {event_id}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(f"Unable to find event with {event_id=}")

    statement = (
        select(AimbatStation)
        .distinct()
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.event_id == event.id)
        .options(
            selectinload(rel(AimbatStation.seismograms)).selectinload(
                rel(AimbatSeismogram.parameters)
            ),
            selectinload(rel(AimbatStation.seismograms)).selectinload(
                rel(AimbatSeismogram.quality)
            ),
            selectinload(rel(AimbatStation.seismograms)).selectinload(
                rel(AimbatSeismogram.event)
            ),
        )
    )

    logger.debug(f"Executing query: {statement}")
    results = session.exec(statement).all()

    if not as_json:
        return results

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])

    return adapter.dump_python(results, mode="json")

reset_seismogram_parameters

reset_seismogram_parameters(
    session: Session, seismogram_id: UUID
) -> None

Reset an AimbatSeismogram's parameters to their default values.

All fields defined on AimbatSeismogramParametersBase are reset to the values produced by a fresh default instance, so newly added fields are picked up automatically.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

ID of seismogram to reset parameters for.

required
Source code in src/aimbat/core/_seismogram.py
def reset_seismogram_parameters(session: Session, seismogram_id: UUID) -> None:
    """Reset an AimbatSeismogram's parameters to their default values.

    All fields defined on AimbatSeismogramParametersBase are reset to the
    values produced by a fresh default instance, so newly added fields are
    picked up automatically.

    Args:
        session: Database session.
        seismogram_id: ID of seismogram to reset parameters for.
    """

    logger.info(f"Resetting parameters for seismogram {seismogram_id}.")

    seismogram = session.exec(
        select(AimbatSeismogram)
        .where(AimbatSeismogram.id == seismogram_id)
        .options(
            selectinload(rel(AimbatSeismogram.parameters)),
            selectinload(rel(AimbatSeismogram.event)).options(
                selectinload(rel(AimbatEvent.parameters)),
                selectinload(rel(AimbatEvent.seismograms)).selectinload(
                    rel(AimbatSeismogram.parameters)
                ),
            ),
        )
    ).one_or_none()
    if seismogram is None:
        raise NoResultFound(f"No AimbatSeismogram found with {seismogram_id=}")

    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    defaults = AimbatSeismogramParametersBase()
    for field_name in AimbatSeismogramParametersBase.model_fields:
        setattr(seismogram.parameters, field_name, getattr(defaults, field_name))
    session.add(seismogram)
    parameters_hash = compute_parameters_hash(seismogram.event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, seismogram.event)

resolve_event

resolve_event(
    session: Session, event_id: UUID | None = None
) -> AimbatEvent

Resolve an event from an explicit ID.

Parameters:

Name Type Description Default
session Session

SQL session.

required
event_id UUID | None

Optional event ID.

None

Returns:

Type Description
AimbatEvent

The specified event.

Raises:

Type Description
NoResultFound

If an explicit event_id is given but not found.

NoResultFound

If no event_id is given.

Source code in src/aimbat/core/_event.py
def resolve_event(session: Session, event_id: UUID | None = None) -> AimbatEvent:
    """Resolve an event from an explicit ID.

    Args:
        session: SQL session.
        event_id: Optional event ID.

    Returns:
        The specified event.

    Raises:
        NoResultFound: If an explicit event_id is given but not found.
        NoResultFound: If no event_id is given.
    """
    if event_id:
        logger.debug(f"Resolving event by explicit ID: {event_id}.")
        event = session.get(AimbatEvent, event_id)
        if event is None:
            raise NoResultFound(f"No AimbatEvent found with id: {event_id}.")
        return event

    raise NoResultFound("No event specified.")

rollback_to_snapshot

rollback_to_snapshot(
    session: Session, snapshot_id: UUID
) -> None

Rollback to an AIMBAT parameters snapshot.

Parameters:

Name Type Description Default
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def rollback_to_snapshot(session: Session, snapshot_id: UUID) -> None:
    """Rollback to an AIMBAT parameters snapshot.

    Args:
        snapshot_id: Snapshot id.
    """

    logger.info(f"Rolling back to snapshot with id={snapshot_id}.")

    statement = (
        select(AimbatSnapshot)
        .where(AimbatSnapshot.id == snapshot_id)
        .options(
            selectinload(rel(AimbatSnapshot.event)).selectinload(
                rel(AimbatEvent.parameters)
            ),
            selectinload(rel(AimbatSnapshot.event_parameters_snapshot)),
            selectinload(
                rel(AimbatSnapshot.seismogram_parameters_snapshots)
            ).selectinload(rel(AimbatSeismogramParametersSnapshot.parameters)),
        )
    )
    snapshot = session.exec(statement).one_or_none()
    if snapshot is None:
        raise ValueError(f"No AimbatSnapshot found with {snapshot_id=}")

    # create object with just the parameters
    rollback_event_parameters = AimbatEventParametersBase.model_validate(
        snapshot.event_parameters_snapshot
    )
    logger.debug(
        f"Using event parameters snapshot with id={snapshot.event_parameters_snapshot.id} for rollback."
    )
    current_event_parameters = snapshot.event.parameters

    # setting attributes explicitly brings them into the session
    for k in AimbatEventParametersBase.model_fields.keys():
        v = getattr(rollback_event_parameters, k)
        logger.debug(f"Setting event parameter {k} to {v!r} for rollback.")
        setattr(current_event_parameters, k, v)

    session.add(current_event_parameters)

    for seismogram_parameters_snapshot in snapshot.seismogram_parameters_snapshots:
        rollback_seismogram_parameters = AimbatSeismogramParametersBase.model_validate(
            seismogram_parameters_snapshot
        )
        logger.debug(
            f"Using seismogram parameters snapshot with id={seismogram_parameters_snapshot.id} for rollback."
        )
        current_seismogram_parameters = seismogram_parameters_snapshot.parameters
        for k in AimbatSeismogramParametersBase.model_fields.keys():
            v = getattr(rollback_seismogram_parameters, k)
            logger.debug(f"Setting seismogram parameter {k} to {v!r} for rollback.")
            setattr(current_seismogram_parameters, k, v)
        session.add(current_seismogram_parameters)

    session.commit()
    sync_from_matching_hash(session, snapshot_id=snapshot_id)

run_iccs

run_iccs(
    session: Session,
    event: AimbatEvent,
    iccs: ICCS,
    autoflip: bool,
    autoselect: bool,
) -> IccsResult

Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance.

required
autoflip bool

If True, automatically flip seismograms to maximise cross-correlation.

required
autoselect bool

If True, automatically deselect seismograms whose cross-correlation falls below the threshold.

required

Returns:

Type Description
IccsResult

IccsResult from the algorithm run.

Source code in src/aimbat/core/_iccs.py
def run_iccs(
    session: Session, event: AimbatEvent, iccs: ICCS, autoflip: bool, autoselect: bool
) -> IccsResult:
    """Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance.
        autoflip: If True, automatically flip seismograms to maximise cross-correlation.
        autoselect: If True, automatically deselect seismograms whose cross-correlation
            falls below the threshold.

    Returns:
        IccsResult from the algorithm run.
    """

    logger.info(f"Running ICCS (autoflip={autoflip}, autoselect={autoselect}).")

    result = iccs(autoflip=autoflip, autoselect=autoselect)
    n_iter = len(result.convergence)
    status = "converged" if result.converged else "did not converge"
    logger.info(f"ICCS {status} after {n_iter} iterations.")
    _write_back_seismograms(session, iccs)
    _write_iccs_stats(event.id, iccs)
    return result

run_mccc

run_mccc(
    session: Session,
    event: AimbatEvent,
    iccs: ICCS,
    all_seismograms: bool,
) -> McccResult

Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance.

required
all_seismograms bool

If True, include deselected seismograms in the alignment.

required

Returns:

Type Description
McccResult

McccResult from the algorithm run.

Source code in src/aimbat/core/_iccs.py
def run_mccc(
    session: Session, event: AimbatEvent, iccs: ICCS, all_seismograms: bool
) -> McccResult:
    """Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance.
        all_seismograms: If True, include deselected seismograms in the alignment.

    Returns:
        McccResult from the algorithm run.
    """

    logger.info(
        f"Running MCCC for event {event.id} (all_seismograms={all_seismograms})."
    )

    result = iccs.run_mccc(
        all_seismograms=all_seismograms,
        min_cc=event.parameters.mccc_min_cc,
        damping=event.parameters.mccc_damp,
    )
    _write_back_seismograms(session, iccs)
    _write_iccs_stats(event.id, iccs)
    _write_mccc_quality(event.id, iccs, result, all_seismograms)
    return result

save_note

save_note(
    session: Session,
    target: NoteTarget,
    target_id: UUID,
    content: str,
) -> None

Save note content for the given entity, creating the note record if needed.

Parameters:

Name Type Description Default
session Session

Active database session.

required
target NoteTarget

Entity type — one of event, station, seismogram, snapshot.

required
target_id UUID

UUID of the target entity.

required
content str

Markdown note content to save.

required
Source code in src/aimbat/core/_note.py
def save_note(
    session: Session, target: NoteTarget, target_id: uuid.UUID, content: str
) -> None:
    """Save note content for the given entity, creating the note record if needed.

    Args:
        session: Active database session.
        target: Entity type — one of `event`, `station`, `seismogram`, `snapshot`.
        target_id: UUID of the target entity.
        content: Markdown note content to save.
    """
    attr = getattr(AimbatNote, f"{target}_id")
    note = session.exec(select(AimbatNote).where(attr == target_id)).first()
    if note is None:
        note = AimbatNote(**{f"{target}_id": target_id, "content": content})
    else:
        note.content = content
    session.add(note)
    session.commit()

set_event_parameter

set_event_parameter(
    session: Session,
    event_id: UUID,
    name: EventParameter,
    value: Timedelta | bool | float | str,
    *,
    validate_iccs: bool = False,
) -> None

Set event parameter value for the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

UUID of the event to set the parameter value for.

required
name EventParameter

Name of the parameter.

required
value Timedelta | bool | float | str

Value to set.

required
validate_iccs bool

If True, attempt ICCS construction with the new value before committing. Raises and leaves the database unchanged on failure.

False
Source code in src/aimbat/core/_event.py
def set_event_parameter(
    session: Session,
    event_id: UUID,
    name: EventParameter,
    value: Timedelta | bool | float | str,
    *,
    validate_iccs: bool = False,
) -> None:
    """Set event parameter value for the given event.

    Args:
        session: Database session.
        event_id: UUID of the event to set the parameter value for.
        name: Name of the parameter.
        value: Value to set.
        validate_iccs: If True, attempt ICCS construction with the new value
            before committing. Raises and leaves the database unchanged on failure.
    """
    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    logger.debug(f"Setting {name=} to {value} for event {event_id=}.")

    event = session.exec(
        select(AimbatEvent)
        .where(AimbatEvent.id == event_id)
        .options(
            selectinload(rel(AimbatEvent.parameters)),
            selectinload(rel(AimbatEvent.seismograms)).selectinload(
                rel(AimbatSeismogram.parameters)
            ),
        )
    ).one_or_none()
    if event is None:
        raise NoResultFound(f"No AimbatEvent found with id: {event_id}.")

    # Perform Pydantic validation (including optional ICCS validation)
    parameters = AimbatEventParametersBase.model_validate(
        event.parameters,
        update={name: value},
        context={"validate_iccs": validate_iccs, "event": event},
    )

    setattr(event.parameters, name, getattr(parameters, name))
    session.add(event)
    parameters_hash = compute_parameters_hash(event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, event)

set_seismogram_parameter

set_seismogram_parameter(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None

Set parameter value for an AimbatSeismogram instance.

Parameters:

Name Type Description Default
session Session

Database session

required
seismogram_id UUID

Seismogram id.

required
name SeismogramParameter

Name of the parameter.

required
value Timestamp | bool | str

Value to set parameter to.

required
Source code in src/aimbat/core/_seismogram.py
def set_seismogram_parameter(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None:
    """Set parameter value for an AimbatSeismogram instance.

    Args:
        session: Database session
        seismogram_id: Seismogram id.
        name: Name of the parameter.
        value: Value to set parameter to.

    """
    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    logger.debug(
        f"Setting seismogram {name=} to {value=} in seismogram {seismogram_id=}."
    )

    seismogram = session.exec(
        select(AimbatSeismogram)
        .where(AimbatSeismogram.id == seismogram_id)
        .options(
            selectinload(rel(AimbatSeismogram.parameters)),
            selectinload(rel(AimbatSeismogram.event)).options(
                selectinload(rel(AimbatEvent.parameters)),
                selectinload(rel(AimbatEvent.seismograms)).selectinload(
                    rel(AimbatSeismogram.parameters)
                ),
            ),
        )
    ).one_or_none()
    if seismogram is None:
        raise ValueError(f"No AimbatSeismogram found with {seismogram_id=}")

    parameters = AimbatSeismogramParametersBase.model_validate(
        seismogram.parameters, update={name: value}
    )
    setattr(seismogram.parameters, name, getattr(parameters, name))
    session.add(seismogram)
    parameters_hash = compute_parameters_hash(seismogram.event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, seismogram.event)

sync_from_matching_hash

sync_from_matching_hash(
    session: Session,
    parameters_hash: str | None = None,
    snapshot_id: UUID | None = None,
) -> bool

Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

Searches all snapshots for candidates whose parameters_hash matches and that have MCCC quality data. When multiple candidates exist, snapshot_id is used as a tie-breaker (preferred if it is among them); otherwise the most recent candidate is used.

Parameters:

Name Type Description Default
session Session

Database session.

required
parameters_hash str | None

Hash to match against snapshot hashes. If None and snapshot_id is provided, the hash is derived from that snapshot.

None
snapshot_id UUID | None

Optional tie-breaker when multiple candidates share the same hash.

None

Returns:

Type Description
bool

True if quality metrics were synced, False if no suitable candidate

bool

was found.

Raises:

Type Description
ValueError

If both are provided but the hashes differ.

Source code in src/aimbat/core/_snapshot.py
def sync_from_matching_hash(
    session: Session,
    parameters_hash: str | None = None,
    snapshot_id: UUID | None = None,
) -> bool:
    """Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

    Searches all snapshots for candidates whose `parameters_hash` matches and
    that have MCCC quality data. When multiple candidates exist, `snapshot_id`
    is used as a tie-breaker (preferred if it is among them); otherwise the
    most recent candidate is used.

    Args:
        session: Database session.
        parameters_hash: Hash to match against snapshot hashes. If None and
            `snapshot_id` is provided, the hash is derived from that snapshot.
        snapshot_id: Optional tie-breaker when multiple candidates share the
            same hash.

    Returns:
        True if quality metrics were synced, False if no suitable candidate
        was found.

    Raises:
        ValueError: If both are provided but the hashes differ.
    """
    if parameters_hash is None:
        if snapshot_id is None:
            return False
        snapshot = session.get(AimbatSnapshot, snapshot_id)
        if snapshot is None:
            raise ValueError(f"No AimbatSnapshot found with {snapshot_id=}")
        parameters_hash = snapshot.parameters_hash
        if parameters_hash is None:
            return False
    elif snapshot_id is not None:
        snapshot = session.get(AimbatSnapshot, snapshot_id)
        if snapshot is not None and snapshot.parameters_hash != parameters_hash:
            raise ValueError(
                f"Provided parameters_hash does not match hash on snapshot {snapshot_id}."
            )

    logger.debug(f"Looking for quality metrics to sync for hash {parameters_hash}.")

    candidates = [
        s
        for s in get_snapshots(session)
        if s.parameters_hash == parameters_hash
        and s.event_quality_snapshot is not None
        and s.event_quality_snapshot.mccc_rmse is not None
    ]
    if not candidates:
        logger.debug("No snapshot with matching hash and MCCC quality data found.")
        return False

    preferred = next((c for c in candidates if c.id == snapshot_id), None)
    snapshot = (
        preferred if preferred is not None else max(candidates, key=lambda s: s.time)
    )

    logger.info(f"Syncing quality metrics from snapshot {snapshot.id}.")

    event_quality_snap = snapshot.event_quality_snapshot
    if event_quality_snap is None:
        raise ValueError(
            f"Snapshot {snapshot.id} has no event quality data despite passing filter."
        )
    live_event_quality = session.get(
        AimbatEventQuality, event_quality_snap.event_quality_id
    )
    if live_event_quality is None:
        logger.warning(
            f"Live event quality record {event_quality_snap.event_quality_id} not found; skipping event quality sync."
        )
    else:
        for k in AimbatEventQualityBase.model_fields:
            v = getattr(event_quality_snap, k)
            logger.debug(f"Setting event quality {k} to {v!r} from snapshot.")
            setattr(live_event_quality, k, v)
        session.add(live_event_quality)

    for seis_quality_snap in snapshot.seismogram_quality_snapshots:
        live_seis_quality = session.get(
            AimbatSeismogramQuality, seis_quality_snap.seismogram_quality_id
        )
        if live_seis_quality is None:
            logger.warning(
                f"Live seismogram quality record {seis_quality_snap.seismogram_quality_id} not found; skipping."
            )
            continue
        for k in AimbatSeismogramQualityBase.model_fields:
            v = getattr(seis_quality_snap, k)
            logger.debug(f"Setting seismogram quality {k} to {v!r} from snapshot.")
            setattr(live_seis_quality, k, v)
        session.add(live_seis_quality)

    session.commit()
    return True

sync_iccs_parameters

sync_iccs_parameters(
    session: Session, event: AimbatEvent, iccs: ICCS
) -> None

Sync an existing ICCS instance's parameters from the database.

Updates event-level and per-seismogram parameters without re-reading waveform data. Use this after operations that change parameters but not the seismogram list (e.g. rolling back to a snapshot).

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance to update in-place.

required
Source code in src/aimbat/core/_iccs.py
def sync_iccs_parameters(session: Session, event: AimbatEvent, iccs: ICCS) -> None:
    """Sync an existing ICCS instance's parameters from the database.

    Updates event-level and per-seismogram parameters without re-reading waveform
    data. Use this after operations that change parameters but not the
    seismogram list (e.g. rolling back to a snapshot).

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance to update in-place.
    """

    logger.debug(f"Syncing ICCS parameters from database for event {event.id}.")

    event_params = AimbatEventParametersBase.model_validate(event.parameters)
    for field_name in AimbatEventParametersBase.model_fields:
        if hasattr(iccs, field_name):
            setattr(iccs, field_name, getattr(event_params, field_name))

    for iccs_seis in iccs.seismograms:
        db_seis = session.get(AimbatSeismogram, iccs_seis.extra["id"])
        if db_seis is not None:
            seis_params = AimbatSeismogramParametersBase.model_validate(
                db_seis.parameters
            )
            for field_name in AimbatSeismogramParametersBase.model_fields:
                setattr(iccs_seis, field_name, getattr(seis_params, field_name))

    iccs.clear_cache()

validate_iccs_construction

validate_iccs_construction(
    event: AimbatEvent,
    parameters: AimbatEventParametersBase | None = None,
) -> None

Try to construct an ICCS instance for the event without caching the result.

Use this to check whether the event's current (possibly uncommitted) parameters are compatible with ICCS construction before persisting them to the database.

Parameters:

Name Type Description Default
event AimbatEvent

AimbatEvent.

required
parameters AimbatEventParametersBase | None

Optional AimbatEventParametersBase to use instead of the live event parameters (useful for validation).

None
Source code in src/aimbat/core/_iccs.py
def validate_iccs_construction(
    event: AimbatEvent, parameters: AimbatEventParametersBase | None = None
) -> None:
    """Try to construct an ICCS instance for the event without caching the result.

    Use this to check whether the event's current (possibly uncommitted) parameters
    are compatible with ICCS construction before persisting them to the database.

    Args:
        event: AimbatEvent.
        parameters: Optional AimbatEventParametersBase to use instead of the live
            event parameters (useful for validation).

    Raises:
        Any exception raised by ICCS construction (e.g. invalid parameter values).
    """
    _build_iccs(event, parameters=parameters)