Source code for spacebench.env

"""Module for defining the SpaceEnvironment class"""
import os
import zipfile
from dataclasses import dataclass

import yaml
import numpy as np
import scipy.sparse
import pandas as pd
import networkx as nx

from spacebench.log import LOGGER
from spacebench.datamaster import DataMaster
from spacebench.api.dataverse import DataverseAPI


[docs]@dataclass class SpaceDataset: """ Class for storing a spatial causal inference benchmark dataset. """ treatment: np.ndarray covariates: np.ndarray outcome: np.ndarray edges: list[tuple[int, int]] treatment_values: np.ndarray smoothness_of_missing: float | None = None confounding_of_missing: float | None = None counterfactuals: np.ndarray | None = None coordinates: np.ndarray | None = None
[docs] def has_binary_treatment(self) -> bool: """ Returns true if treatment is binary. """ return len(self.treatment_values) == 2
[docs] def erf(self) -> np.ndarray: """ Returns the exposure-response function, also known in the literature as the average dose-response function. Returns ------- np.ndarray: The exposure-response function """ return self.counterfactuals.mean(0)
[docs] def adjacency_matrix( self, sparse: bool = False ) -> np.ndarray | scipy.sparse.csr_matrix: """ Returns the adjacency matrix of the graph. Parameters ---------- sparse: bool, optional (default is False) If True, returns a sparse matrix of type csr_matrix. If False, returns a dense matrix. Returns ------- np.ndarray | scipy.sparse.csr_matrix Adjacency matrix where entry (i, j) is 1 if there is an edge between node i and node j. """ n = len(self.treatment) if sparse: adj = scipy.sparse.csr_matrix((n, n)) else: adj = np.zeros((n, n)) for e in self.edges: adj[e[0], e[1]] = 1 adj[e[1], e[0]] = 1 return adj
[docs]class SpaceEnv: """ Class for a SpaCE environment. It holdss the data and metadata that is used to generate the datasets by masking a covariate, which becomes a missing confounder. Attributes ---------- api: DataverseAPI Dataverse API object. config: dict Dictionary with the configuration of the dataset. counfound_score_dict: dict Dictionary with the confounding scores of the covariates. datamaster: DataMaster DataMaster object. dir: str Directory where the dataset is stored. graph: networkx.Graph Graph of the dataset. metadata: dict Dictionary with the metadata of the dataset. name: str Name of the dataset. smoothness_score_dict: dict Dictionary with the smoothness scores of the covariates. synthetic_data: pd.DataFrame Synthetic data of the dataset. """ def __init__(self, name: str, dir: str | None = None): """ Initializes the SpaceEnv class using a dataset name. When the dataset is not found in the directory, it is downloaded from the dataverse. Parameters ---------- name: str Name of the dataset. See the DataMaster.list_envs() method for a list of available datasets. dir: str, optional Directory where the dataset is stored. Defaults to a temporary directory. """ self.name = name self.datamaster = DataMaster() self.api = DataverseAPI(dir) self.dir = self.api.dir # will be tmp if dir is None # check if dataset is available if name not in self.datamaster.list_envs(): raise ValueError(f"Dataset {name} not available") # download .zip detaset if necessary tgtdir = os.path.join(self.dir, name) if not os.path.exists(os.path.join(self.dir, name)): # download .zip file zip_path = self.api.download_data(name + ".zip") # unzip foder with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(tgtdir) # remove .zip file os.remove(zip_path) # birth certificate/config with open(os.path.join(tgtdir, "config.yaml"), "r") as f: self.config = yaml.load(f, Loader=yaml.BaseLoader) # extract synthetic data and metadata properties self.synthetic_data = pd.read_csv( os.path.join(tgtdir, "synthetic_data.csv"), index_col=0 ) with open(os.path.join(tgtdir, "metadata.yaml"), "r") as f: self.metadata = yaml.load(f, Loader=yaml.BaseLoader) # read graph self.graph = nx.read_graphml(os.path.join(tgtdir, "graph.graphml")) # information about spatial complexity # TODO: there is an inconsistency in the names confounding_score and # spatial_scores # plural, singular self.confounding_score_dict = { x: float(v) for x, v in self.metadata["confounding_score"].items() } self.smoothness_score_dict = { x: float(v) for x, v in self.metadata["spatial_scores"].items() }
[docs] def _check_scores( self, c: str, min_confounding: float, max_confounding: float, min_smoothness: float, max_smoothness: float) -> bool: """ Check if given covariate's smoothness and confounding is within the given ranges. Parameters ---------- c: str Covariate to check. min_confounding: float Minimum confounding score. max_confounding: float Maximum confounding score. min_smoothness: float Minimum smoothness score. max_smoothness: float Maximum smoothness score. Returns ------- bool True if scores are within range, False otherwise. """ smoothness = self.smoothness_score_dict[c] confounding = self.confounding_score_dict[c] return (min_confounding <= confounding <= max_confounding and min_smoothness <= smoothness <= max_smoothness)
def __masking_candidates( self, min_confounding: float = 0.0, max_confounding: float = 1.0, min_smoothness: float = 0.0, max_smoothness: float = 1.0, ) -> str: """ Auxiliary method for finding a covariate that satisfies the requirements for masking. """ candidates = [c for c in self.metadata["covariates"] if self._check_scores(c, min_confounding, max_confounding, min_smoothness, max_smoothness)] if len(candidates) == 0: raise ValueError("No covariate found with the " "specified requirements") return candidates def __gen__dataset__from__observed_and_missing( self, missing: str | None, observed: list[str], ) -> SpaceDataset: """ Generates a SpaceDataset from a list of observed covariates. """ if missing is not None: observed = [c for c in observed if c != missing] missing_smoothness = self.smoothness_score_dict[missing] missing_confounding = self.confounding_score_dict[missing] else: observed = self.metadata["covariates"] missing_smoothness = None missing_confounding = None # counterfactulas, outcome and treatment # for counterfactuals, we need to make sure they are in the right order outcome = self.synthetic_data["Y_synth"].values columns = self.synthetic_data.columns cfcols = columns.str.startswith("Y_synth_") treatment_index = [int(x[-1]) for x in columns[cfcols].str.split("_")] cfcols_order = np.argsort(treatment_index) cfcols = columns[cfcols][cfcols_order] counterfactuals = self.synthetic_data[cfcols].values treatment = self.synthetic_data[self.metadata["treatment"]].values # extract graph in usable format node2id = {n: i for i, n in enumerate(self.graph.nodes)} edge_list = [(node2id[e[0]], node2id[e[1]]) for e in self.graph.edges] coordinates = [] for v in self.graph.nodes.values(): coordinates.append([float(x) for x in v.values()]) coordinates = np.array(coordinates) # treatment values, make sure they are float treatment_values = np.array( [float(x) for x in self.metadata["treatment_values"]] ) return SpaceDataset( treatment=treatment, covariates=self.synthetic_data[observed].values, outcome=outcome, counterfactuals=counterfactuals, edges=edge_list, coordinates=coordinates, smoothness_of_missing=missing_smoothness, confounding_of_missing=missing_confounding, treatment_values=treatment_values, )
[docs] def make_unmasked(self) -> SpaceDataset: """ Generates a SpaceDataset with all covariates observed (no missing confounding). Returns ------- SpaceDataset A SpaceDataset with all covariates observed. """ missing = None observed = self.metadata["covariates"] return self.__gen__dataset__from__observed_and_missing(missing, observed)
[docs] def make( self, missing: str | None = None, min_confounding: float = 0.0, max_confounding: float = 1.0, min_smoothness: float = 0.0, max_smoothness: float = 1.0, ) -> SpaceDataset: """ Generates a SpaceDataset by masking a covariate. Parameters ---------- missing: str, optional (Default is None) Name of the covariate to be masked. If no covariate is specified, a covariate is selected at random from the ones that satisfy requirements for masking in terms of smoothness and confounding. min_confounding: float, optional (Default is 0.0) Minimum confounding score for the covariate to be masked. max_confounding: float, optional (Default is 1.0) Maximum confounding score for the covariate to be masked. min_smoothness: float, optional (Default is 0.0) Minimum smoothness score for the covariate to be masked. max_smoothness: float, optional (Default is 1.0) Maximum smoothness score for the covariate to be masked. Returns ------- SpaceDataset A SpaceDataset. """ if missing is None: candidates = self.__masking_candidates( min_confounding, max_confounding, min_smoothness, max_smoothness ) missing = np.random.choice(candidates) LOGGER.debug(f"Missing covariate (selected at random): {missing}") observed = [c for c in self.metadata["covariates"] if c != missing] return self.__gen__dataset__from__observed_and_missing(missing, observed)
[docs] def make_all( self, min_confounding: float = 0.0, max_confounding: float = 1.0, min_smoothness: float = 0.0, max_smoothness: float = 1.0, ): """ Generates all possible SpaceDatasets by masking all posssible covariates. Parameters ---------- min_confounding: float, optional (Default is 0.0) Minimum confounding score for the covariate to be masked. max_confounding: float, optional (Default is 1.0) Maximum confounding score for the covariate to be masked. min_smoothness: float, optional (Default is 0.0) Minimum smoothness score for the covariate to be masked. max_smoothness: float, optional (Default is 1.0) Maximum smoothness score for the covariate to be masked. Returns ------- Generator[SpaceDataset]: Generator of SpaceDatasets """ for c in self.metadata["covariates"]: if self._check_scores(c, min_confounding, max_confounding, min_smoothness, max_smoothness): yield self.make(missing=c)
if __name__ == "__main__": # small test # TODO: convert in unit test dm = DataMaster() envname = dm.list_envs()[0] dir = "downloads" generator = SpaceEnv(envname, dir) data = generator.make() datasets = [generator.make() for _ in range(10)] LOGGER.debug("ok")