Skip to content

results_aggregation

results_aggregation

Flood loss results aggregation.

Provides :class:FloodResultsAggregator, a standalone helper that reads a DuckDB database produced by :class:~inland_consequences.InlandFloodAnalysis and aggregates the site-specific building-level results to:

  • Geographic hierarchies derived from the 15-digit Census Block FIPS (cbfips) stored on each building: state, county, census tract, census block group, census block, NFIP community, and HUC watershed.
  • Attribute breakdowns across occupancy type, damage category, construction type, foundation type and flood peril type.

Return periods are discovered dynamically from the losses table so the aggregator works regardless of the set of return periods used in the analysis.

Example usage::

from inland_consequences.results_aggregation import FloodResultsAggregator

with FloodResultsAggregator("analysis.duckdb") as agg:
    # Losses by county
    county_df = agg.aggregate(geography="county")

    # Losses by county broken down by occupancy type
    county_occ_df = agg.aggregate(
        geography="county",
        breakdown=["occupancy_type"],
    )

    # Losses by NFIP community
    community_df = agg.aggregate(geography="community")

    # Losses by HUC-8 watershed
    huc_df = agg.aggregate(geography="huc", huc_digits=8)

Classes

FloodResultsAggregator

FloodResultsAggregator(db_path: Optional[str | Path] = None, conn: Optional[DuckDBPyConnection] = None, community_xref_path: Optional[str | Path] = None, watershed_xref_path: Optional[str | Path] = None)

Aggregate building-level flood loss results to geographic and attribute summaries.

Parameters

db_path: Path to a DuckDB database produced by InlandFloodAnalysis. Mutually exclusive with conn. conn: An already-open duckdb.DuckDBPyConnection. Use this when calling from within an InlandFloodAnalysis context manager so that the same connection (and its in-memory tables) is reused. Mutually exclusive with db_path. community_xref_path: Path to the census-block → NFIP-community mapping parquet (hzCommunity_Block.parquet). Defaults to the bundled copy shipped with the package. watershed_xref_path: Path to the census-block → HUC mapping parquet (syWatershed_Block.parquet). Defaults to the bundled copy shipped with the package.

Source code in src/inland_consequences/results_aggregation.py
def __init__(
    self,
    db_path: Optional[str | Path] = None,
    conn: Optional[duckdb.DuckDBPyConnection] = None,
    community_xref_path: Optional[str | Path] = None,
    watershed_xref_path: Optional[str | Path] = None,
) -> None:
    if db_path is not None and conn is not None:
        raise ValueError("Provide either db_path or conn, not both.")
    if db_path is None and conn is None:
        raise ValueError("Provide either db_path or conn.")

    self._db_path = Path(db_path) if db_path is not None else None
    self._external_conn = conn
    self._owned_conn: Optional[duckdb.DuckDBPyConnection] = None

    self._community_xref = Path(
        community_xref_path if community_xref_path is not None else _DEFAULT_COMMUNITY_XREF
    )
    self._watershed_xref = Path(
        watershed_xref_path if watershed_xref_path is not None else _DEFAULT_WATERSHED_XREF
    )

    self._return_periods: Optional[list[int]] = None  # cached after first query
Functions
get_return_periods
get_return_periods() -> list[int]

Return the sorted list of return periods present in the losses table.

Source code in src/inland_consequences/results_aggregation.py
def get_return_periods(self) -> list[int]:
    """Return the sorted list of return periods present in the ``losses`` table."""
    if self._return_periods is None:
        rows = self._conn.execute(
            "SELECT DISTINCT return_period FROM losses ORDER BY return_period"
        ).fetchall()
        self._return_periods = [int(r[0]) for r in rows]
    return self._return_periods
aggregate
aggregate(geography: GeographyLevel, breakdown: Optional[list[str]] = None, huc_digits: int = 8, loss_metric: LossMetric = 'loss_mean') -> pd.DataFrame

Aggregate results to the requested geographic and/or attribute level.

Parameters

geography: The geographic aggregation level. Census-hierarchy levels (state, county, tract, block_group, block) are derived from the 15-digit cbfips field on each building. community and huc use the bundled reference parquet files. breakdown: Optional list of building attribute columns to add as additional group-by dimensions. Supported values: st_damcat, occupancy_type, general_building_type, foundation_type, flood_peril_type. huc_digits: Number of HUC digits to use when geography="huc". The bundled reference contains HUC-8 identifiers; pass a smaller even number (e.g. 6 or 4) to roll up to a coarser watershed unit. loss_metric: Which column from the losses table to aggregate. Defaults to loss_mean.

Returns

pandas.DataFrame One row per unique combination of geographic ID and breakdown dimensions. Columns include building_count, total_building_exposure, total_content_exposure, total_exposure, loss_rp{N} and loss_ratio_rp{N} for each return period, aal_mean, and aal_ratio.

Source code in src/inland_consequences/results_aggregation.py
def aggregate(
    self,
    geography: GeographyLevel,
    breakdown: Optional[list[str]] = None,
    huc_digits: int = 8,
    loss_metric: LossMetric = "loss_mean",
) -> pd.DataFrame:
    """Aggregate results to the requested geographic and/or attribute level.

    Parameters
    ----------
    geography:
        The geographic aggregation level.  Census-hierarchy levels
        (``state``, ``county``, ``tract``, ``block_group``, ``block``) are
        derived from the 15-digit ``cbfips`` field on each building.
        ``community`` and ``huc`` use the bundled reference parquet files.
    breakdown:
        Optional list of building attribute columns to add as additional
        group-by dimensions.  Supported values: ``st_damcat``,
        ``occupancy_type``, ``general_building_type``, ``foundation_type``,
        ``flood_peril_type``.
    huc_digits:
        Number of HUC digits to use when ``geography="huc"``.  The bundled
        reference contains HUC-8 identifiers; pass a smaller even number
        (e.g. 6 or 4) to roll up to a coarser watershed unit.
    loss_metric:
        Which column from the ``losses`` table to aggregate.  Defaults to
        ``loss_mean``.

    Returns
    -------
    pandas.DataFrame
        One row per unique combination of geographic ID and breakdown
        dimensions.  Columns include ``building_count``,
        ``total_building_exposure``, ``total_content_exposure``,
        ``total_exposure``, ``loss_rp{N}`` and ``loss_ratio_rp{N}`` for
        each return period, ``aal_mean``, and ``aal_ratio``.
    """
    breakdown = list(breakdown or [])
    invalid = set(breakdown) - self.BREAKDOWN_FIELDS
    if invalid:
        raise ValueError(
            f"Unsupported breakdown field(s): {invalid}. "
            f"Allowed: {self.BREAKDOWN_FIELDS}"
        )

    self._register_xrefs()
    geo_id_col = self._GEO_ID_COL[geography]

    if geography in self._GEO_FIPS_LEN:
        return self._aggregate_fips(geography, geo_id_col, breakdown, loss_metric)
    elif geography == "community":
        return self._aggregate_community(geo_id_col, breakdown, loss_metric)
    elif geography == "huc":
        return self._aggregate_huc(geo_id_col, huc_digits, breakdown, loss_metric)
    else:
        raise ValueError(f"Unknown geography level: {geography!r}")