Shortcuts

Source code for lumin.nn.data.fold_yielder

from __future__ import annotations

import json
import pickle
import warnings
from collections import OrderedDict
from importlib import import_module
from pathlib import Path
from typing import Dict, List, Optional, Type, Union

import h5py
import numpy as np
import pandas as pd
from fastcore.all import is_listy
from sklearn.model_selection import KFold
from sklearn.pipeline import Pipeline
from torch_geometric.data import Dataset as PyGDataset

from .batch_yielder import BatchYielder, TorchGeometricBatchYielder

__all__ = ["FoldYielder", "HEPAugFoldYielder", "TorchGeometricFoldYielder"]


[docs]class FoldYielder: r""" Interface class for accessing data from foldfiles created by :meth:`~lumin.data_processing.file_proc.df2foldfile` Arguments: foldfile: filename of hdf5 file or opened hdf5 file cont_feats: list of names of continuous features present in input data, not required if foldfile contains meta data already cat_feats: list of names of categorical features present in input data, not required if foldfile contains meta data already ignore_feats: optional list of input features which should be ignored input_pipe: optional Pipeline, or filename for pickled Pipeline, which was used for processing the inputs output_pipe: optional Pipeline, or filename for pickled Pipeline, which was used for processing the targets yield_matrix: whether to actually yield matrix data if present matrix_pipe: preprocessing pipe for matrix data batch_yielder_type: Class of :class:`~lumin.nn.data.batch_yielder.BatchYielder` to instantiate to yield inputs Examples:: >>> fy = FoldYielder('train.h5') >>> >>> fy = FoldYielder('train.h5', ignore_feats=['phi'], input_pipe='input_pipe.pkl') >>> >>> fy = FoldYielder('train.h5', input_pipe=input_pipe, matrix_pipe=matrix_pipe) >>> >>> fy = FoldYielder('train.h5', input_pipe=input_pipe, yield_matrix=False) """ # TODO: Matrix example def __init__( self, foldfile: Union[str, Path, h5py.File], cont_feats: Optional[List[str]] = None, cat_feats: Optional[List[str]] = None, ignore_feats: Optional[List[str]] = None, input_pipe: Optional[Union[str, Pipeline, Path]] = None, output_pipe: Optional[Union[str, Pipeline, Path]] = None, yield_matrix: bool = True, matrix_pipe: Optional[Union[str, Pipeline, Path]] = None, batch_yielder_type: Type[BatchYielder] = BatchYielder, ): self.cont_feats, self.cat_feats, self.input_pipe, self.output_pipe = ( cont_feats, cat_feats, input_pipe, output_pipe, ) self.yield_matrix, self.matrix_pipe = yield_matrix, matrix_pipe self.batch_yielder_type = batch_yielder_type self.augmented, self.aug_mult, self.train_time_aug, self.test_time_aug = False, 0, False, False self._set_foldfile(foldfile) self.input_feats = self.cont_feats + self.cat_feats self.orig_cont_feats, self.orig_cat_feat, self._ignore_feats = self.cont_feats, self.cat_feats, [] if isinstance(self.input_pipe, str) or isinstance(self.input_pipe, Path): self.add_input_pipe_from_file(self.input_pipe) if isinstance(self.output_pipe, str) or isinstance(self.output_pipe, Path): self.add_output_pipe_from_file(self.output_pipe) if isinstance(self.matrix_pipe, str) or isinstance(self.matrix_pipe, Path): self.add_matrix_pipe_from_file(self.matrix_pipe) if ignore_feats is not None: self.add_ignore(ignore_feats) def __repr__(self) -> str: return f"FoldYielder with {self.n_folds} folds, containing {self.columns()}" def __len__(self) -> int: return self.n_folds def __getitem__(self, idx: int) -> Dict[str, np.ndarray]: return self.get_fold(idx) def __iter__(self) -> Dict[str, np.ndarray]: for i in range(self.n_folds): yield self.get_fold(i)
[docs] def columns(self) -> List[str]: r""" Returns list of columns present in foldfile Returns: list of columns present in foldfile """ return [k for k in self.foldfile["fold_0"].keys()]
[docs] def add_ignore(self, feats: Union[str, List[str]]) -> None: r""" Add features to ignored features. Arguments: feats: list of feature names to ignore """ if not is_listy(feats): feats = [feats] self._ignore_feats += feats self.cont_feats = [f for f in self.cont_feats if f not in self._ignore_feats] self.cat_feats = [f for f in self.cat_feats if f not in self._ignore_feats]
[docs] def get_ignore(self) -> List[str]: r""" Returns list of ignored features Returns: Features removed from training data """ return self._ignore_feats
[docs] def get_use_cont_feats(self) -> List[str]: r""" Returns list of continuous features which will be present in training data, accounting for ignored features. Returns: List of continuous features """ return [f for f in self.cont_feats if f not in self._ignore_feats]
[docs] def get_use_cat_feats(self) -> List[str]: r""" Returns list of categorical features which will be present in training data, accounting for ignored features. Returns: List of categorical features """ return [f for f in self.cat_feats if f not in self._ignore_feats]
def _set_foldfile(self, foldfile: Union[str, Path, h5py.File]) -> None: r""" Sets the file from which to access data Arguments: foldfile: filename of h5py file or opened h5py file """ if not isinstance(foldfile, h5py.File): foldfile = h5py.File(foldfile, "r+") self.foldfile, self.n_folds = foldfile, len([f for f in foldfile if "fold_" in f]) self.has_matrix = "matrix_inputs" in self.columns() if "meta_data" in self.foldfile: self._load_meta_data() self.fld_szs = {} for i in range(self.n_folds): if self.target_tensor_is_sparse: self.fld_szs[i] = self.foldfile[f"fold_{i}/targets"][1, -1] + 1 else: self.fld_szs[i] = self.foldfile[f"fold_{i}/targets"].shape[0]
[docs] def get_data_count(self, idxs: Optional[Union[int, List[int]]] = None) -> int: r""" Returns total number of data entries in requested folds Arguments: idxs: list of indices to check Returns: Total number of entries in the folds """ if idxs is None: idxs = list(range(self.n_folds)) if not is_listy(idxs): idxs = [idxs] s = 0 for i in idxs: s += self.fld_szs[i] return s
def _load_meta_data(self) -> None: if self.cont_feats is not None: warnings.warn( "Fold file contains meta data information, explicit passing of continuous and categorical feature lists is no longer required." ) self.cont_feats = json.loads(self.foldfile["meta_data/cont_feats"][()]) self.cat_feats = json.loads(self.foldfile["meta_data/cat_feats"][()]) self.targ_feats = json.loads(self.foldfile["meta_data/targ_feats"][()]) if "wgt_feat" in self.foldfile["meta_data"]: self.wgt_feat = json.loads(self.foldfile["meta_data/wgt_feat"][()]) if "cat_maps" in self.foldfile["meta_data"]: self.cat_maps = OrderedDict(json.loads(self.foldfile["meta_data/cat_maps"][()])) if self.has_matrix: self.matrix_feats = json.loads(self.foldfile["meta_data/matrix_feats"][()]) self.matrix_feats["missing"] = np.array(self.matrix_feats["missing"], dtype=bool) self.matrix_is_sparse = self.matrix_feats["is_sparse"] if "is_sparse" in self.matrix_feats else False self.matrix_shape = self.matrix_feats["shape"] if "shape" in self.matrix_feats else False else: self.matrix_is_sparse = False self.target_is_tensor = "target_tensor" in self.foldfile["meta_data"] if self.target_is_tensor: self.target_tensor_feats = json.loads(self.foldfile["meta_data/target_tensor"][()]) self.target_tensor_is_sparse = ( self.target_tensor_feats["is_sparse"] if "is_sparse" in self.target_tensor_feats else False ) self.target_tensor_shape = ( self.target_tensor_feats["shape"] if "shape" in self.target_tensor_feats else False ) else: self.target_tensor_is_sparse = False if self.matrix_is_sparse or self.target_tensor_is_sparse: self.sparse_module = import_module( "sparse" ) # Don't want to make sparse a dependency due to difficulty of installation on some systems def _append_matrix(self, data, idx) -> Dict[str, np.ndarray]: data["inputs"] = (data["inputs"], self.get_column("matrix_inputs", n_folds=1, fold_idx=idx)) return data
[docs] def close(self) -> None: r""" Closes the foldfile """ self.foldfile.close()
[docs] def add_input_pipe(self, input_pipe: Union[str, Pipeline]) -> None: r""" Adds an input pipe to the FoldYielder for use when deprocessing data Arguments: input_pipe: Pipeline which was used for preprocessing the input data or name of pkl file containing Pipeline """ if isinstance(input_pipe, str) or isinstance(input_pipe, Path): self.add_input_pipe_from_file(input_pipe) else: self.input_pipe = input_pipe
[docs] def add_matrix_pipe(self, matrix_pipe: Union[str, Pipeline]) -> None: r""" Adds an matrix pipe to the FoldYielder for use when deprocessing data .. Warning:: Deprocessing matrix data is not yet implemented Arguments: matrix_pipe: Pipeline which was used for preprocessing the input data or name of pkl file containing Pipeline """ if isinstance(matrix_pipe, str) or isinstance(matrix_pipe, Path): self.add_matrix_pipe_from_file(matrix_pipe) else: self.matrix_pipe = matrix_pipe
[docs] def add_output_pipe(self, output_pipe: Union[str, Pipeline]) -> None: r""" Adds an output pipe to the FoldYielder for use when deprocessing data Arguments: output_pipe: Pipeline which was used for preprocessing the target data or name of pkl file containing Pipeline """ if isinstance(output_pipe, str) or isinstance(output_pipe, Path): self.add_output_pipe_from_file(output_pipe) else: self.output_pipe = output_pipe
[docs] def add_input_pipe_from_file(self, name: Union[str, Path]) -> None: r""" Adds an input pipe from a pkl file to the FoldYielder for use when deprocessing data Arguments: name: name of pkl file containing Pipeline which was used for preprocessing the input data """ with open(name, "rb") as fin: self.input_pipe = pickle.load(fin)
[docs] def add_matrix_pipe_from_file(self, name: str) -> None: r""" Adds an matrix pipe from a pkl file to the FoldYielder for use when deprocessing data Arguments: name: name of pkl file containing Pipeline which was used for preprocessing the matrix data """ with open(name, "rb") as fin: self.matrix_pipe = pickle.load(fin)
[docs] def add_output_pipe_from_file(self, name: Union[str, Path]) -> None: r""" Adds an output pipe from a pkl file to the FoldYielder for use when deprocessing data Arguments: name: name of pkl file containing Pipeline which was used for preprocessing the target data """ with open(name, "rb") as fin: self.output_pipe = pickle.load(fin)
[docs] def get_fold(self, idx: int) -> Dict[str, np.ndarray]: r""" Get data for single fold. Data consists of dictionary of inputs, targets, and weights. Accounts for ignored features. Inputs, except for matrix data, are passed through np.nan_to_num to deal with nans and infs. Arguments: idx: fold index to load Returns: tuple of inputs, targets, and weights as Numpy arrays """ data = self.get_data(n_folds=1, fold_idx=idx) if len(self._ignore_feats) == 0: return self._append_matrix(data, idx) if self.has_matrix and self.yield_matrix else data else: inputs = pd.DataFrame(data["inputs"], columns=self.input_feats) inputs = inputs[ [f for f in self.input_feats if f not in self._ignore_feats] ] # TODO Improve this with preconfigured mask data["inputs"] = inputs.values return self._append_matrix(data, idx) if self.has_matrix and self.yield_matrix else data
[docs] def get_column( self, column: str, n_folds: Optional[int] = None, fold_idx: Optional[int] = None, add_newaxis: bool = False ) -> Union[np.ndarray, None]: r""" Load column (h5py group) from foldfile. Used for getting arbitrary data which isn't automatically grabbed by other methods. Arguments: column: name of h5py group to get n_folds: number of folds to get data from. Default all folds. Not compatable with fold_idx fold_idx: Only load group from a single, specified fold. Not compatable with n_folds add_newaxis: whether expand shape of returned data if data shape is () Returns: Numpy array of column data """ if column not in self.columns(): return None if fold_idx is None: data = [] for i, fold in enumerate([f for f in self.foldfile if "fold_" in f]): if n_folds is not None and i >= n_folds: break tmp = self.foldfile[f"{fold}/{column}"][()] if column == "matrix_inputs" and self.matrix_is_sparse: c = tmp[1:].astype(int) tmp = self.sparse_module.COO( coords=c, data=tmp[0], shape=[c[0][-1] + 1] + self.matrix_shape ).todense() if column == "targets" and self.target_tensor_is_sparse: c = tmp[1:].astype(int) tmp = self.sparse_module.COO( coords=c, data=tmp[0], shape=[c[0][-1] + 1] + self.target_tensor_shape ).todense() data.append(tmp) data = np.concatenate(data) else: if f"fold_{fold_idx}" not in self.foldfile: raise IndexError(f"Fold {fold_idx} does not exist") data = self.foldfile[f"fold_{fold_idx}/{column}"][()] if column == "matrix_inputs" and self.matrix_is_sparse: c = data[1:].astype(int) data = self.sparse_module.COO( coords=c, data=data[0], shape=[c[0][-1] + 1] + self.matrix_shape ).todense() if column == "targets" and self.target_tensor_is_sparse: c = data[1:].astype(int) data = self.sparse_module.COO( coords=c, data=data[0], shape=[c[0][-1] + 1] + self.target_tensor_shape ).todense() return data[:, None] if data[0].shape == () and add_newaxis else data
[docs] def get_data(self, n_folds: Optional[int] = None, fold_idx: Optional[int] = None) -> Dict[str, np.ndarray]: r""" Get data for single, specified fold or several of folds. Data consists of dictionary of inputs, targets, and weights. Does not account for ignored features. Inputs are passed through np.nan_to_num to deal with nans and infs. Arguments: n_folds: number of folds to get data from. Default all folds. Not compatible with fold_idx fold_idx: Only load group from a single, specified fold. Not compatible with n_folds Returns: tuple of inputs, targets, and weights as Numpy arrays """ return { "inputs": np.nan_to_num(self.get_column("inputs", n_folds=n_folds, fold_idx=fold_idx)), "targets": self.get_column("targets", n_folds=n_folds, fold_idx=fold_idx, add_newaxis=True), "weights": self.get_column("weights", n_folds=n_folds, fold_idx=fold_idx, add_newaxis=True), }
[docs] def get_df( self, pred_name: str = "pred", targ_name: str = "targets", wgt_name: str = "weights", n_folds: Optional[int] = None, fold_idx: Optional[int] = None, inc_inputs: bool = False, inc_ignore: bool = False, deprocess: bool = False, verbose: bool = True, suppress_warn: bool = False, nan_to_num: bool = False, inc_matrix: bool = False, ) -> pd.DataFrame: r""" Get a Pandas DataFrame of the data in the foldfile. Will add columns for inputs (if requested), targets, weights, and predictions (if present) Arguments: pred_name: name of prediction group targ_name: name of target group wgt_name: name of weight group n_folds: number of folds to get data from. Default all folds. Not compatible with fold_idx fold_idx: Only load group from a single, specified fold. Not compatible with n_folds inc_inputs: whether to include input data inc_ignore: whether to include ignored features deprocess: whether to deprocess inputs and targets if pipelines have been verbose: whether to print the number of datapoints loaded suppress_warn: whether to suppress the warning about missing columns nan_to_num: whether to pass input data through `np.nan_to_num` inc_matrix: whether to include flattened matrix data in output, if present Returns: Pandas DataFrame with requested data """ # TODO Decide how to handle deprocessing matrix data: option for object by object, flattened out? if inc_inputs: inputs = self.get_column("inputs", n_folds=n_folds, fold_idx=fold_idx) if deprocess and self.input_pipe is not None: try: inputs = np.hstack( ( self.input_pipe.inverse_transform(inputs[:, : len(self.orig_cont_feats)]), inputs[:, len(self.orig_cont_feats) :], ) ) except ValueError: if self.has_matrix: print( "Deprocessing of flat data failed, possible due to the input_pipe expecting to also transform matrix data. Deprocessing of matrix" "is not currently implemented, and deprocessing of flat data using an input_pipe which expects matrix data as well is difficult " "due to loss of variable ordering. In future please use separate pipes to preprocess flat data and matrix data. Returning inputs " "as processed." ) else: print("Deprocessing of flat data failed, returning inputs as processed.") if nan_to_num: inputs = np.nan_to_num(inputs) data = pd.DataFrame(inputs, columns=self.input_feats) if len(self._ignore_feats) > 0 and not inc_ignore: data = data[[f for f in self.input_feats if f not in self._ignore_feats]] if self.has_matrix and inc_matrix: mat = self.get_column("matrix_inputs", n_folds=n_folds, fold_idx=fold_idx).reshape( len(inputs), np.multiply(*self.matrix_feats["shape"]) ) mat = mat[:, np.logical_not(self.matrix_feats["missing"])] # if deprocess and self.matrix_pipe is not None: mat = self.matrix_pipe.inverse_transform(mat) if nan_to_num: mat = np.nan_to_num(mat) data = data.join(pd.DataFrame(mat, columns=self.matrix_feats["present_feats"])) else: data = pd.DataFrame() targets = self.get_column(targ_name, n_folds=n_folds, fold_idx=fold_idx) if deprocess and self.output_pipe is not None: targets = self.output_pipe.inverse_transform(targets) if targets is not None and len(targets.shape) > 1: for t in range(targets.shape[-1]): data[f"gen_target_{t}"] = targets[:, t] elif targets is None and not suppress_warn: warnings.warn(f"{targ_name} not found in file") else: data["gen_target"] = targets weights = self.get_column(wgt_name, n_folds=n_folds, fold_idx=fold_idx) if weights is not None and weights is not None and len(weights.shape) > 1: for w in range(weights.shape[-1]): data[f"gen_weight_{w}"] = weights[:, w] elif weights is None and not suppress_warn: warnings.warn(f"{wgt_name} not found in file") else: data["gen_weight"] = weights preds = self.get_column(pred_name, n_folds=n_folds, fold_idx=fold_idx) if deprocess and self.output_pipe is not None: preds = self.output_pipe.inverse_transform(preds) if preds is not None and len(preds.shape) > 1: for p in range(preds.shape[-1]): data[f"pred_{p}"] = preds[:, p] elif preds is not None: data["pred"] = preds elif not suppress_warn: warnings.warn(f"{pred_name} not found in foldfile file") if verbose: print(f"{len(data)} datapoints loaded") return data
[docs] def save_fold_pred(self, pred: np.ndarray, fold_idx: int, pred_name: str = "pred") -> None: r""" Save predictions for given fold as a new column in the foldfile Arguments: pred: array of predictions in the same order as data appears in the file fold_idx: index for fold pred_name: name of column to save predictions under """ n = f"fold_{fold_idx}/{pred_name}" if n in self.foldfile: del self.foldfile[n] self.foldfile.create_dataset(n, shape=pred.shape, dtype="float32") self.foldfile[n][...] = pred
[docs]class HEPAugFoldYielder(FoldYielder): r""" Specialised version of :class:`~lumin.nn.data.fold_yielder.FoldYielder` providing HEP specific data augmetation at train and test time. Arguments: foldfile: filename of hdf5 file or opened hdf5 file cont_feats: list of names of continuous features present in input data, not required if foldfile contains meta data already cat_feats: list of names of categorical features present in input data, not required if foldfile contains meta data already ignore_feats: optional list of input features which should be ignored aug_targ_feats: optional list of target vectors to also be transformed, leave as `None` for no augmentation of targets vectirs rot_mult: number of rotations of event in phi to make at test-time (currently must be even). Greater than zero will also apply random rotations during train-time random_rot: whether test-time rotation angles should be random or in steps of 2pi/rot_mult reflect_x: whether to reflect events in x axis at train and test time reflect_y: whether to reflect events in y axis at train and test time reflect_z: whether to reflect events in z axis at train and test time train_time_aug: whether to apply augmentations at train time test_time_aug: whether to apply augmentations at test time input_pipe: optional Pipeline, or filename for pickled Pipeline, which was used for processing the inputs output_pipe: optional Pipeline, or filename for pickled Pipeline, which was used for processing the targets yield_matrix: whether to actually yield matrix data if present matrix_pipe: preprocessing pipe for matrix data Examples:: >>> fy = HEPAugFoldYielder('train.h5', ... cont_feats=['pT','eta','phi','mass'], ... rot_mult=2, reflect_y=True, reflect_z=True, ... input_pipe='input_pipe.pkl') """ def __init__( self, foldfile: Union[str, Path, h5py.File], cont_feats: Optional[List[str]] = None, cat_feats: Optional[List[str]] = None, ignore_feats: Optional[List[str]] = None, aug_targ_feats: Optional[List[str]] = None, rot_mult: int = 2, random_rot: bool = False, reflect_x: bool = False, reflect_y: bool = True, reflect_z: bool = True, train_time_aug: bool = True, test_time_aug: bool = True, input_pipe: Optional[Pipeline] = None, output_pipe: Optional[Pipeline] = None, yield_matrix: bool = True, matrix_pipe: Optional[Union[str, Pipeline]] = None, ): super().__init__( foldfile=foldfile, cont_feats=cont_feats, cat_feats=cat_feats, ignore_feats=ignore_feats, input_pipe=input_pipe, output_pipe=output_pipe, yield_matrix=yield_matrix, matrix_pipe=matrix_pipe, ) if rot_mult > 0 and not random_rot and rot_mult % 2 != 0: warnings.warn( "Warning: rot_mult must currently be even for fixed rotations, adding an extra rotation multiplicity" ) rot_mult += 1 ( self.rot_mult, self.random_rot, self.reflect_x, self.reflect_y, self.reflect_z, self.train_time_aug, self.test_time_aug, ) = (rot_mult, random_rot, reflect_x, reflect_y, reflect_z, train_time_aug, test_time_aug) self.aug_targ_feats = ( aug_targ_feats if aug_targ_feats is None or isinstance(aug_targ_feats, list) else [aug_targ_feats] ) self.augmented, self.reflect_axes, self.aug_mult = True, [], 1 self.vectors = [x[:-3] for x in self.cont_feats if "_px" in x] if self.aug_targ_feats is not None: self.targ_vectors = [x[:-3] for x in self.aug_targ_feats if "_px" in x] if self.rot_mult: print("Augmenting via phi rotations") self.aug_mult = self.rot_mult if self.reflect_y: print("Augmenting via y flips") self.reflect_axes += ["_py"] self.aug_mult *= 2 if self.reflect_z: print("Augmenting via longitunidnal flips") self.reflect_axes += ["_pz"] self.aug_mult *= 2 else: if self.reflect_x: print("Augmenting via x flips") self.reflect_axes += ["_px"] self.aug_mult *= 2 if self.reflect_y: print("Augmenting via y flips") self.reflect_axes += ["_py"] self.aug_mult *= 2 if self.reflect_z: print("Augmenting via longitunidnal flips") self.reflect_axes += ["_pz"] self.aug_mult *= 2 print(f"Total augmentation multiplicity is {self.aug_mult}") def _rotate(self, df: pd.DataFrame, vecs: List[str]) -> None: for vec in vecs: df.loc[:, f"{vec}_pxtmp"] = df.loc[:, f"{vec}_px"] * np.cos(df.loc[:, "aug_angle"]) - df.loc[ :, f"{vec}_py" ] * np.sin(df.loc[:, "aug_angle"]) df.loc[:, f"{vec}_py"] = df.loc[:, f"{vec}_py"] * np.cos(df.loc[:, "aug_angle"]) + df.loc[ :, f"{vec}_px" ] * np.sin(df.loc[:, "aug_angle"]) df.loc[:, f"{vec}_px"] = df.loc[:, f"{vec}_pxtmp"] def _reflect(self, df: pd.DataFrame, vectors: List[str]) -> None: for vector in vectors: for coord in self.reflect_axes: try: cut = df[f"aug{coord}"] == 1 df.loc[cut, f"{vector}{coord}"] = -df.loc[cut, f"{vector}{coord}"] except KeyError: pass
[docs] def get_fold(self, idx: int) -> Dict[str, np.ndarray]: r""" Get data for single fold applying random train-time data augmentation. Data consists of dictionary of inputs, targets, and weights. Accounts for ignored features. Inputs, except for matrix data, are passed through np.nan_to_num to deal with nans and infs. Arguments: idx: fold index to load Returns: tuple of inputs, targets, and weights as Numpy arrays """ data = self.get_data(n_folds=1, fold_idx=idx) if not self.augmented: return data inputs = pd.DataFrame(self.foldfile[f"fold_{idx}/inputs"][()], columns=self.input_feats) if self.aug_targ_feats is not None: targets = pd.DataFrame(self.foldfile[f"fold_{idx}/targets"][()], columns=self.targ_feats) if self.rot_mult: inputs["aug_angle"] = (2 * np.pi * np.random.random(size=len(inputs))) - np.pi self._rotate(inputs, self.vectors) if self.aug_targ_feats is not None: targets["aug_angle"] = inputs["aug_angle"] self._rotate(targets, self.targ_vectors) for coord in self.reflect_axes: inputs[f"aug{coord}"] = np.random.randint(0, 2, size=len(inputs)) if self.aug_targ_feats is not None: targets[f"aug{coord}"] = inputs[f"aug{coord}"] self._reflect(inputs, self.vectors) if self.aug_targ_feats is not None: self._reflect(targets, self.targ_vectors) inputs = inputs[[f for f in self.input_feats if f not in self._ignore_feats]] data["inputs"] = np.nan_to_num(inputs.values) if self.aug_targ_feats is not None: targets = targets[self.targ_feats] data["targets"] = np.nan_to_num(targets.values) return self._append_matrix(data, idx) if self.has_matrix and self.yield_matrix else data
def _get_ref_idx(self, aug_idx: int) -> str: n_axes = len(self.reflect_axes) div = self.rot_mult if self.rot_mult else 1 if n_axes == 3: return "{0:03b}".format(int(aug_idx / div)) elif n_axes == 2: return "{0:02b}".format(int(aug_idx / div)) elif n_axes == 1: return "{0:01b}".format(int(aug_idx / div))
[docs] def get_test_fold(self, idx: int, aug_idx: int) -> Dict[str, np.ndarray]: r""" Get test data for single fold applying test-time data augmentaion. Data consists of dictionary of inputs, targets, and weights. Accounts for ignored features. Inputs, except for matrix data, are passed through np.nan_to_num to deal with nans and infs. Arguments: idx: fold index to load aug_idx: index for the test-time augmentaion (ignored if random test-time augmentation requested) Returns: tuple of inputs, targets, and weights as Numpy arrays """ if aug_idx >= self.aug_mult: raise ValueError(f"Invalid augmentation idx passed {aug_idx}") data = self.get_data(n_folds=1, fold_idx=idx) if not self.augmented: return data inputs = pd.DataFrame(self.foldfile[f"fold_{idx}/inputs"][()], columns=self.input_feats) if len(self.reflect_axes) > 0 and self.rot_mult > 0: rot_idx = aug_idx % self.rot_mult ref_idx = self._get_ref_idx(aug_idx) if self.random_rot: inputs["aug_angle"] = (2 * np.pi * np.random.random(size=len(inputs))) - np.pi else: inputs["aug_angle"] = np.linspace(0, 2 * np.pi, (self.rot_mult) + 1)[rot_idx] self._rotate(inputs, self.vectors) for i, coord in enumerate(self.reflect_axes): inputs[f"aug{coord}"] = int(ref_idx[i]) self._reflect(inputs, self.vectors) elif len(self.reflect_axes) > 0: ref_idx = self._get_ref_idx(aug_idx) for i, coord in enumerate(self.reflect_axes): inputs[f"aug{coord}"] = int(ref_idx[i]) self._reflect(inputs, self.vectors) elif self.rot_mult: if self.random_rot: inputs["aug_angle"] = (2 * np.pi * np.random.random(size=len(inputs))) - np.pi else: inputs["aug_angle"] = np.linspace(0, 2 * np.pi, (self.rot_mult) + 1)[aug_idx] self._rotate(inputs, self.vectors) inputs = inputs[[f for f in self.input_feats if f not in self._ignore_feats]] data["inputs"] = np.nan_to_num(inputs.values) if self.aug_targ_feats is not None: targets = pd.DataFrame(self.foldfile[f"fold_{idx}/targets"][()], columns=self.targ_feats) if len(self.reflect_axes) > 0 and self.rot_mult > 0: rot_idx = aug_idx % self.rot_mult ref_idx = self._get_ref_idx(aug_idx) if self.random_rot: targets["aug_angle"] = (2 * np.pi * np.random.random(size=len(targets))) - np.pi else: targets["aug_angle"] = np.linspace(0, 2 * np.pi, (self.rot_mult) + 1)[rot_idx] self._rotate(targets, self.targ_vectors) for i, coord in enumerate(self.reflect_axes): targets[f"aug{coord}"] = int(ref_idx[i]) self._reflect(targets, self.targ_vectors) elif len(self.reflect_axes) > 0: ref_idx = self._get_ref_idx(aug_idx) for i, coord in enumerate(self.reflect_axes): targets[f"aug{coord}"] = int(ref_idx[i]) self._reflect(targets, self.targ_vectors) elif self.rot_mult: if self.random_rot: targets["aug_angle"] = (2 * np.pi * np.random.random(size=len(targets))) - np.pi else: targets["aug_angle"] = np.linspace(0, 2 * np.pi, (self.rot_mult) + 1)[aug_idx] self._rotate(targets, self.targ_vectors) targets = targets[self.targ_feats] data["targets"] = np.nan_to_num(targets.values) return self._append_matrix(data, idx) if self.has_matrix and self.yield_matrix else data
[docs]class TorchGeometricFoldYielder(FoldYielder): r""" Interface class for accessing data from PyTorch Geometric datasets. Dataset will be split into sub-folds; either provide a value for the `fold_indices` argument with your own split as a list of lists of indices, or specify the number of folds for a random split (`n_folds`) ..warning:: Much functionality has yet to be implemented for this class Arguments: dataset: PyTorch Geometric Dataset containing inputs, weights, and targets n_folds: number of folds in which to randomly split the dataset. Must provide either this or `fold_indices` fold_indices: list of lists of indices; each list of indices is a fold. Must provide either this or `n_folds` shuffle: if no `fold_indeces` are provided, data will be split into the speified number of folds. This controls whether the indeces will be shuffled beforehand or not. seed: if no `fold_indeces` are provided, data will be split into the speified number of folds. This sets the random seed used for shuffling, if requested. batch_yielder_type: Class of :class:`~lumin.nn.data.batch_yielder.BatchYielder` to instantiate to yield inputs """ def __init__( self, dataset: PyGDataset, n_folds: Optional[int], fold_indices: Optional[List[List[int]]] = None, shuffle: bool = True, seed: Optional[int] = None, batch_yielder_type: Type[BatchYielder] = TorchGeometricBatchYielder, ): self.dataset = dataset self.batch_yielder_type = batch_yielder_type self._set_folds(n_folds, fold_indices, shuffle, seed) self.cont_feats, self.cat_feats, self.input_pipe, self.output_pipe = [], [], None, None self.yield_matrix, self.matrix_pipe = True, None self.augmented, self.aug_mult, self.train_time_aug, self.test_time_aug = False, 0, False, False self.input_feats = self.cont_feats + self.cat_feats self.orig_cont_feats, self.orig_cat_feat, self._ignore_feats = self.cont_feats, self.cat_feats, [] def __repr__(self) -> str: return f"FoldYielder with {self.n_folds} folds" def __len__(self) -> int: return self.n_folds def __getitem__(self, idx: int) -> PyGDataset: return self.get_fold(idx) def __iter__(self) -> PyGDataset: for i in range(self.n_folds): yield self.get_fold(i) def _set_folds( self, n_folds: Optional[int], fold_indices: Optional[List[List[int]]] = None, shuffle: bool = True, seed: Optional[int] = None, ) -> None: if fold_indices is None: kf = KFold(n_splits=n_folds, shuffle=shuffle, random_state=seed) fold_indices = [f[1] for f in kf.split(X=np.arange(len(self.dataset)))] self.n_folds = n_folds else: self.n_folds = len(fold_indices) self.fold_indices = fold_indices self.fld_szs = {i: len(f) for i, f in enumerate(self.fold_indices)}
[docs] def columns(self) -> List[str]: raise NotImplementedError()
[docs] def add_ignore(self, feats: Union[str, List[str]]) -> None: raise NotImplementedError()
def _set_foldfile(self, foldfile: Union[str, Path, h5py.File]) -> None: raise NotImplementedError() def _append_matrix(self, data, idx) -> Dict[str, np.ndarray]: raise NotImplementedError()
[docs] def close(self) -> None: pass
[docs] def get_fold(self, idx: int) -> Dict[str, np.ndarray]: r""" Get data for single fold. Data consists of a slice of a PyTorch Geometric Dataset. Arguments: idx: fold index to load Returns: PyTorch Geometric Dataset slice """ return {"inputs": self.dataset[self.fold_indices[idx]]}
[docs] def get_column( self, column: str, n_folds: Optional[int] = None, fold_idx: Optional[int] = None, add_newaxis: bool = False ) -> Union[np.ndarray, None]: raise NotImplementedError()
[docs] def get_data(self, n_folds: Optional[int] = None, fold_idx: Optional[int] = None) -> Dict[str, np.ndarray]: raise NotImplementedError()
[docs] def get_df( self, pred_name: str = "pred", targ_name: str = "targets", wgt_name: str = "weights", n_folds: Optional[int] = None, fold_idx: Optional[int] = None, inc_inputs: bool = False, inc_ignore: bool = False, deprocess: bool = False, verbose: bool = True, suppress_warn: bool = False, nan_to_num: bool = False, inc_matrix: bool = False, ) -> pd.DataFrame: raise NotImplementedError()
[docs] def save_fold_pred(self, pred: np.ndarray, fold_idx: int, pred_name: str = "pred") -> None: raise NotImplementedError()

Docs

Access comprehensive developer and user documentation for LUMIN

View Docs

Tutorials

Get tutorials for beginner and advanced researchers demonstrating many of the features of LUMIN

View Tutorials