da-message-passing / src / damp / inla_bridge.py
inla_bridge.py
Raw
import csv
from pathlib import Path
from tempfile import TemporaryDirectory

import rpy2.robjects as ro
from numpy import ndarray
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr

from damp import gp


def run(prior: gp.Prior, obs: gp.Obs) -> tuple[ndarray, ndarray]:
    r_Matrix = importr("Matrix")
    importr("INLA")

    with TemporaryDirectory(prefix="damp_r_communication_") as temp_dir_str:
        temp_dir = Path(temp_dir_str)

        py_pp = prior.precision.tocoo()
        ro.globalenv["prior_precision"] = r_Matrix.sparseMatrix(
            i=ro.IntVector(py_pp.row + 1),
            j=ro.IntVector(py_pp.col + 1),
            x=ro.FloatVector(py_pp.data),
            dims=ro.IntVector(py_pp.shape),
        )

        data = ro.r["read.csv"](str(_write_data(prior, obs, temp_dir)))

        formula = ro.r(
            f'y ~ f(x, model = "generic0", Cmatrix = prior_precision, hyper = list(prec = list( initial = 1e-3, fixed = TRUE)))'
        )
        ro.globalenv["result"] = ro.r["inla"](formula, data=data)

        r_results = ro.r("result$summary.random$x")
        with (ro.default_converter + pandas2ri.converter).context():
            results = ro.conversion.get_conversion().rpy2py(r_results)

    pred_means = results["mean"].to_numpy().reshape(prior.interior_shape)
    pred_stds = results["sd"].to_numpy().reshape(prior.interior_shape)
    return pred_means, pred_stds


def _write_data(prior: gp.Prior, obs: gp.Obs, temp_dir: Path) -> Path:
    output_path = temp_dir / "data.csv"
    vals_by_idx = {_convert_xy_to_inla_idx(prior, xy): val for xy, val in obs}
    with open(output_path, "w") as file:
        writer = csv.writer(file)
        writer.writerow(["x", "y"])
        # The INLA idx start at 1, so remember to set the range appropriately.
        # Oscar: is there a reason we don't just iterate over the dictionary?
        for idx in range(1, prior.precision.shape[0] + 1):
            if idx in vals_by_idx:
                writer.writerow([idx, vals_by_idx[idx]])
    return output_path


def _convert_xy_to_inla_idx(prior: gp.Prior, xy: tuple[int, int]) -> int:
    x, y = xy
    # x and y are in boundary coordinates -> convert to interior coordinates
    x = x - 1
    y = y - 1
    # Given a field Z of size [width x height], we store the point (x,y) at Z[x,y]
    # which is the xth column and yth row.
    zero_indexed_index = (x * prior.interior_shape.height) + y
    # Finally, add one to convert from Python zero-indexed arrays to R's one-indexed
    # arrays.
    return zero_indexed_index + 1