Source code for ibllib.atlas.regions

from dataclasses import dataclass
import logging
from pathlib import Path

import numpy as np
import pandas as pd
from iblutil.util import Bunch
from iblutil.numerical import ismember

_logger = logging.getLogger(__name__)
# 'Beryl' is the name given to an atlas containing a subset of the most relevant allen annotations
FILE_MAPPINGS = str(Path(__file__).parent.joinpath('mappings.pqt'))
ALLEN_FILE_REGIONS = str(Path(__file__).parent.joinpath('allen_structure_tree.csv'))
FRANKLIN_FILE_REGIONS = str(Path(__file__).parent.joinpath('franklin_paxinos_structure_tree.csv'))


@dataclass
class _BrainRegions:
    id: np.ndarray
    name: object
    acronym: object
    rgb: np.uint8
    level: np.ndarray
    parent: np.ndarray
    order: np.uint16

    @property
    def rgba(self):
        rgba = np.c_[self.rgb, self.rgb[:, 0] * 0 + 255]
        rgba[0, :] = 0  # set the void to transparent
        return rgba

    def _compute_order(self):
        """
        Compute the order of regions, per region order by left hemisphere and then right hemisphere
        :return:
        """
        orders = np.zeros_like(self.id)
        # Left hemisphere first
        orders[1::2] = np.arange(self.n_lr) + self.n_lr + 1
        # Then right hemisphere
        orders[2::2] = np.arange(self.n_lr) + 1

    def get(self, ids) -> Bunch:
        """
        Get a bunch of the name/id
        """
        uid, uind = np.unique(ids, return_inverse=True)
        a, iself, _ = np.intersect1d(self.id, uid, assume_unique=False, return_indices=True)
        b = Bunch()
        for k in self.__dataclass_fields__.keys():
            b[k] = self.__getattribute__(k)[iself[uind]]
        return b

    def _navigate_tree(self, ids, direction='down', return_indices=False):
        """
        Private method to navigate the tree and get all related objects either up, down or along the branch.
        By convention the provided id is returned in the list of regions
        :param ids: array or single allen id (int32)
        :param direction: 'up' returns ancestors, 'down' descendants
        :param return indices: Bool (False), if true returns a second argument with indices mapping
        to the current br object
        :return: Bunch
        """
        indices = ismember(self.id, ids)[0]
        count = np.sum(indices)
        while True:
            if direction == 'down':
                indices |= ismember(self.parent, self.id[indices])[0]
            elif direction == 'up':
                indices |= ismember(self.id, self.parent[indices])[0]
            else:
                raise ValueError("direction should be either 'up' or 'down'")
            if count == np.sum(indices):  # last iteration didn't find any match
                break
            else:
                count = np.sum(indices)
        if return_indices:
            return self.get(self.id[indices]), np.where(indices)[0]
        else:
            return self.get(self.id[indices])

    def subtree(self, scalar_id, return_indices=False):
        """
        Given a node, returns the subtree containing the node along with ancestors
        :param return indices: Bool (False), if true returns a second argument with indices mapping
        to the current br object
        :return: Bunch
        """
        if not np.isscalar(scalar_id):
            assert scalar_id.size == 1
        _, idown = self._navigate_tree(scalar_id, direction='down', return_indices=True)
        _, iup = self._navigate_tree(scalar_id, direction='up', return_indices=True)
        indices = np.unique(np.r_[idown, iup])
        if return_indices:
            return self.get(self.id[indices]), np.where(indices)[0]
        else:
            return self.get(self.id[indices])

    def descendants(self, ids, **kwargs):
        """
        Get descendants from one or an array of ids
        :param ids: np.array or scalar representing the region primary key
        :param return_indices: Bool (False) returns the indices in the current br obj
        :return: Bunch
        """
        return self._navigate_tree(ids, direction='down', **kwargs)

    def ancestors(self, ids, **kwargs):
        """
        Get ancestors from one or an array of ids
        :param ids: np.array or scalar representing the region primary key
        :param return_indices: Bool (False) returns the indices in the current br obj
        :return: Bunch
        """
        return self._navigate_tree(ids, direction='up', **kwargs)

    def leaves(self):
        """
        Get all regions that do not have children
        :return:
        """
        leaves = np.setxor1d(self.id, self.parent)
        return self.get(np.int64(leaves[~np.isnan(leaves)]))

    def propagate_down(self, acronyms, values):
        """
        This function remaps a set of user specified acronyms and values to the
        swanson map, by filling down the child nodes when higher up values are
        provided.
        :param acronyms: list or array of allen ids or acronyms
        :param values: list or array of associated values
        :return:
        """
        user_aids = self.parse_acronyms_argument(acronyms)
        _, user_indices = ismember(user_aids, self.id)
        self.compute_hierarchy()
        ia, ib = ismember(self.hierarchy, user_indices)
        v = np.zeros_like(ia, dtype=np.float64) * np.NaN
        v[ia] = values[ib]
        all_values = np.nanmedian(v, axis=0)
        indices = np.where(np.any(ia, axis=0))[0]
        all_values = all_values[indices]
        return indices, all_values

    def _mapping_from_regions_list(self, new_map, lateralize=False):
        """
        From a vector of regions id, creates a mapping such as
        newids = self.mapping
        :param new_map: np.array: vector of regions id
        """
        I_ROOT = 1
        I_VOID = 0
        # to lateralize we make sure all regions are represented in + and -
        new_map = np.unique(np.r_[-new_map, new_map])
        assert np.all(np.isin(new_map, self.id)), \
            "All mapping ids should be represented in the Allen ids"
        # with the lateralization, self.id may have duplicate values so ismember is necessary
        iid, inm = ismember(self.id, new_map)
        iid = np.where(iid)[0]
        mapind = np.zeros_like(self.id) + I_ROOT  # non assigned regions are root
        # TO DO should root be lateralised?
        mapind[iid] = iid  # regions present in the list have the same index
        # Starting by the higher up levels in the hierarchy, assign all descendants to the mapping
        for i in np.argsort(self.level[iid]):
            descendants = self.descendants(self.id[iid[i]]).id
            _, idesc, _ = np.intersect1d(self.id, descendants, return_indices=True)
            mapind[idesc] = iid[i]
        mapind[0] = I_VOID  # void stays void
        # to delateralize the regions, assign the positive index to all mapind elements
        if lateralize is False:
            _, iregion = ismember(np.abs(self.id), self.id)
            mapind = mapind[iregion]
        return mapind

    def acronym2acronym(self, acronym, mapping=None):
        """
        Remap acronyms onto mapping

        :param acronym: list or array of acronyms
        :param mapping: target map to remap acronyms
        :return: array of remapped acronyms
        """
        mapping = mapping or self.default_mapping
        inds = self._find_inds(acronym, self.acronym)
        return self.acronym[self.mappings[mapping]][inds]

    def acronym2id(self, acronym, mapping=None, hemisphere=None):
        """
        Convert acronyms to atlas ids and remap

        :param acronym: list or array of acronyms
        :param mapping: target map to remap atlas_ids
        :param hemisphere: which hemisphere to return atlas ids for, options left or right
        :return: array of remapped atlas ids
        """
        mapping = mapping or self.default_mapping
        inds = self._find_inds(acronym, self.acronym)
        return self.id[self.mappings[mapping]][self._filter_lr(inds, mapping, hemisphere)]

    def acronym2index(self, acronym, mapping=None, hemisphere=None):
        """
        Convert acronym to index and remap
        :param acronym:
        :param mapping:
        :param hemisphere:
        :return: array of remapped acronyms and list of indexes for each acronnym
        """
        mapping = mapping or self.default_mapping
        acronym = self.acronym2acronym(acronym, mapping=mapping)
        index = list()
        for id in acronym:
            inds = np.where(self.acronym[self.mappings[mapping]] == id)[0]
            index.append(self._filter_lr_index(inds, hemisphere))

        return acronym, index

    def id2acronym(self, atlas_id, mapping=None):
        """
        Convert atlas id to acronym and remap

        :param acronym: list or array of atlas ids
        :param mapping: target map to remap acronyms
        :return: array of remapped acronyms
        """
        mapping = mapping or self.default_mapping
        inds = self._find_inds(atlas_id, self.id)
        return self.acronym[self.mappings[mapping]][inds]

    def id2id(self, atlas_id, mapping='Allen'):
        """
        Remap atlas id onto mapping

        :param acronym: list or array of atlas ids
        :param mapping: target map to remap acronyms
        :return: array of remapped atlas ids
        """

        inds = self._find_inds(atlas_id, self.id)
        return self.id[self.mappings[mapping]][inds]

    def id2index(self, atlas_id, mapping='Allen'):
        """
        Convert atlas id to index and remap

        :param atlas_id: list or array of atlas ids
        :param mapping: mapping to use
        :return: dict of indices for each atlas_id
        """

        atlas_id = self.id2id(atlas_id, mapping=mapping)
        index = list()
        for id in atlas_id:
            inds = np.where(self.id[self.mappings[mapping]] == id)[0]
            index.append(inds)

        return atlas_id, index

    def index2acronym(self, index, mapping=None):
        """
        Convert index to acronym and remap

        :param index:
        :param mapping:
        :return:
        """
        mapping = mapping or self.default_mapping
        inds = self.acronym[self.mappings[mapping]][index]
        return inds

    def index2id(self, index, mapping=None):
        """
        Convert index to atlas id and remap

        :param index:
        :param mapping:
        :return:
        """
        mapping = mapping or self.default_mapping
        inds = self.id[self.mappings[mapping]][index]
        return inds

    def _filter_lr(self, values, mapping, hemisphere):
        """
        Filter values by those on left or right hemisphere
        :param values: array of index values
        :param mapping: mapping to use
        :param hemisphere: hemisphere
        :return:
        """
        if 'lr' in mapping:
            if hemisphere == 'left':
                return values + self.n_lr
            elif hemisphere == 'right':
                return values
            else:
                return np.c_[values + self.n_lr, values]
        else:
            return values

    def _filter_lr_index(self, values, hemisphere):
        """
        Filter index values  by those on left or right hemisphere

        :param values: array of index values
        :param mapping: mapping to use
        :param hemisphere: hemisphere
        :return:
        """
        if hemisphere == 'left':
            return values[values > self.n_lr]
        elif hemisphere == 'right':
            return values[values <= self.n_lr]
        else:
            return values

    def _find_inds(self, values, all_values):
        if not isinstance(values, list) and not isinstance(values, np.ndarray):
            values = np.array([values])
        _, inds = ismember(np.array(values), all_values)

        return inds

    def parse_acronyms_argument(self, acronyms, mode='raise'):
        """
        Parse an input acronym arguments: returns a numpy array of allen regions ids
        regardless of the input: list of acronyms, np.array of acronyms strings or np aray of allen ids
        To be used into functions to provide flexible input type
        :param acronyms: List, np.array of acronym strings or np.array of allen ids
        :return: np.array of int ids
        """
        # first get the allen region ids regardless of the input type
        acronyms = np.array(acronyms)
        # if the user provides acronyms they're not signed by definition
        if not np.issubdtype(acronyms.dtype, np.number):
            user_aids = self.acronym2id(acronyms)
            if mode == 'raise':
                assert user_aids.size == acronyms.size, "All acronyms should exist in Allen ontology"
        else:
            user_aids = acronyms
        return user_aids


[docs]class FranklinPaxinosRegions(_BrainRegions): def __init__(self): df_regions = pd.read_csv(FRANKLIN_FILE_REGIONS) # get rid of nan values, there are rows that are in Allen but are not in the Franklin Paxinos atlas df_regions = df_regions[~df_regions['Structural ID'].isna()] # add in root root = [{'Structural ID': int(997), 'Franklin-Paxinos Full name': 'root', 'Franklin-Paxinos abbreviation': 'root', 'structure Order': 50, 'red': 255, 'green': 255, 'blue': 255, 'Allen Full name': 'root', 'Allen abbreviation': 'root'}] df_regions = pd.concat([pd.DataFrame(root), df_regions], ignore_index=True) allen_regions = pd.read_csv(ALLEN_FILE_REGIONS) # Find the level of acronyms that are the same as Allen a, b = ismember(df_regions['Allen abbreviation'].values, allen_regions['acronym'].values) level = allen_regions['depth'].values[b] df_regions['level'] = np.full(len(df_regions), np.nan) df_regions['allen level'] = np.full(len(df_regions), np.nan) df_regions.loc[a, 'level'] = level df_regions.loc[a, 'allen level'] = level nan_idx = np.where(df_regions['Allen abbreviation'].isna())[0] df_regions.loc[nan_idx, 'Allen abbreviation'] = df_regions['Franklin-Paxinos abbreviation'].values[nan_idx] df_regions.loc[nan_idx, 'Allen Full name'] = df_regions['Franklin-Paxinos Full name'].values[nan_idx] # Now fill in the nan values with one level up from their parents we need to this multiple times while np.sum(np.isnan(df_regions['level'].values)) > 0: nan_loc = np.isnan(df_regions['level'].values) parent_level = df_regions['Parent ID'][nan_loc].values a, b = ismember(parent_level, df_regions['Structural ID'].values) assert len(a) == len(b) == np.sum(nan_loc) level = df_regions['level'].values[b] + 1 df_regions.loc[nan_loc, 'level'] = level # lateralize df_regions_left = df_regions.iloc[np.array(df_regions['Structural ID'] > 0), :].copy() df_regions_left['Structural ID'] = - df_regions_left['Structural ID'] df_regions_left['Parent ID'] = - df_regions_left['Parent ID'] df_regions_left['Allen Full name'] = \ df_regions_left['Allen Full name'].apply(lambda x: x + ' (left)') df_regions = pd.concat((df_regions, df_regions_left), axis=0) # insert void void = [{'Structural ID': int(0), 'Franklin-Paxinos Full Name': 'void', 'Franklin-Paxinos abbreviation': 'void', 'Parent ID': int(0), 'structure Order': 0, 'red': 0, 'green': 0, 'blue': 0, 'Allen Full name': 'void', 'Allen abbreviation': 'void'}] df_regions = pd.concat([pd.DataFrame(void), df_regions], ignore_index=True) # converts colors to RGB uint8 array c = np.c_[df_regions['red'], df_regions['green'], df_regions['blue']].astype(np.uint32) super().__init__(id=df_regions['Structural ID'].to_numpy().astype(np.int64), name=df_regions['Allen Full name'].to_numpy(), acronym=df_regions['Allen abbreviation'].to_numpy(), rgb=c, level=df_regions['level'].to_numpy().astype(np.uint16), parent=df_regions['Parent ID'].to_numpy(), order=df_regions['structure Order'].to_numpy().astype(np.uint16)) self.n_lr = int((len(self.id) - 1) / 2) self._compute_mappings() self.default_mapping = 'FranklinPaxinos' def _compute_mappings(self): self.mappings = { 'FranklinPaxinos': self._mapping_from_regions_list(np.unique(np.abs(self.id)), lateralize=False), 'FranklinPaxinos-lr': np.arange(self.id.size), }
[docs]class BrainRegions(_BrainRegions): """ ibllib.atlas.regions.BrainRegions(brainmap='Allen') The Allen atlas ids are kept intact but lateralized as follows: labels are duplicated and ids multiplied by -1, with the understanding that left hemisphere regions have negative ids. """ def __init__(self): df_regions = pd.read_csv(ALLEN_FILE_REGIONS) # lateralize df_regions_left = df_regions.iloc[np.array(df_regions.id > 0), :].copy() df_regions_left['id'] = - df_regions_left['id'] df_regions_left['parent_structure_id'] = - df_regions_left['parent_structure_id'] df_regions_left['name'] = df_regions_left['name'].apply(lambda x: x + ' (left)') df_regions = pd.concat((df_regions, df_regions_left), axis=0) # converts colors to RGB uint8 array c = np.uint32(df_regions.color_hex_triplet.map( lambda x: int(x, 16) if isinstance(x, str) else 256 ** 3 - 1)) c = np.flip(np.reshape(c.view(np.uint8), (df_regions.id.size, 4))[:, :3], 1) c[0, :] = 0 # set the void region to black # creates the BrainRegion instance super().__init__(id=df_regions.id.to_numpy(), name=df_regions.name.to_numpy(), acronym=df_regions.acronym.to_numpy(), rgb=c, level=df_regions.depth.to_numpy().astype(np.uint16), parent=df_regions.parent_structure_id.to_numpy(), order=df_regions.graph_order.to_numpy().astype(np.uint16)) # mappings are indices not ids: they range from 0 to n regions -1 mappings = pd.read_parquet(FILE_MAPPINGS) self.mappings = {k: mappings[k].to_numpy() for k in mappings} self.n_lr = int((len(self.id) - 1) / 2) self.default_mapping = 'Allen' def _compute_mappings(self): """ Recomputes the mapping indices for all mappings This is left mainly as a reference for adding future mappings as this take a few seconds to execute. In production,we use the MAPPING_FILES pqt to avoid recomputing at each \ instantiation """ beryl = np.load(Path(__file__).parent.joinpath('beryl.npy')) cosmos = np.load(Path(__file__).parent.joinpath('cosmos.npy')) swanson = np.load(Path(__file__).parent.joinpath('swanson_regions.npy')) self.mappings = { 'Allen': self._mapping_from_regions_list(np.unique(np.abs(self.id)), lateralize=False), 'Allen-lr': np.arange(self.id.size), 'Beryl': self._mapping_from_regions_list(beryl, lateralize=False), 'Beryl-lr': self._mapping_from_regions_list(beryl, lateralize=True), 'Cosmos': self._mapping_from_regions_list(cosmos, lateralize=False), 'Cosmos-lr': self._mapping_from_regions_list(cosmos, lateralize=True), 'Swanson': self._mapping_from_regions_list(swanson, lateralize=False), 'Swanson-lr': self._mapping_from_regions_list(swanson, lateralize=True), } pd.DataFrame(self.mappings).to_parquet(FILE_MAPPINGS)
[docs] def compute_hierarchy(self): """ Creates a self.hierarchy attributes that is a n_levels by n_region array of indices. This is useful to perform fast vectorized computations of ancestors and descendants. :return: """ if hasattr(self, 'hierarchy'): return n_levels = np.max(self.level) n_regions = self.id.size # creates the parent index. Void and root are omitted from intersection # as they figure as NaN pmask, i_p = ismember(self.parent, self.id) self.iparent = np.arange(n_regions) self.iparent[pmask] = i_p # the last level of the hierarchy is the actual mapping, then going up level per level # we assign the parend index self.hierarchy = np.tile(np.arange(n_regions), (n_levels, 1)) _mask = np.zeros(n_regions, bool) for lev in np.flipud(np.arange(n_levels)): if lev < (n_levels - 1): self.hierarchy[lev, _mask] = self.iparent[self.hierarchy[lev + 1, _mask]] sel = self.level == (lev + 1) self.hierarchy[lev, sel] = np.where(sel)[0] _mask[sel] = True
[docs] def remap(self, region_ids, source_map='Allen', target_map='Beryl'): """ Remap atlas regions ids from source map to target map :param region_ids: atlas ids to map :param source_map: map name which original region_ids are in :param target_map: map name onto which to map :return: """ _, inds = ismember(region_ids, self.id[self.mappings[source_map]]) return self.id[self.mappings[target_map][inds]]
[docs]def regions_from_allen_csv(): """ Reads csv file containing the ALlen Ontology and instantiates a BrainRegions object :return: BrainRegions object """ _logger.warning("ibllib.atlas.regions.regions_from_allen_csv() is deprecated. " "Use BrainRegions() instead") return BrainRegions()