Source code for ibllib.pipes.dynamic_pipeline

from collections import OrderedDict
from pathlib import Path
import yaml
import ibllib.io.session_params as sess_params
import ibllib.io.extractors.base
import ibllib.pipes.ephys_preprocessing as epp
import ibllib.pipes.tasks as mtasks
import ibllib.pipes.base_tasks as bstasks
import ibllib.pipes.widefield_tasks as wtasks
import ibllib.pipes.sync_tasks as stasks
import ibllib.pipes.behavior_tasks as btasks
import ibllib.pipes.video_tasks as vtasks
import ibllib.pipes.ephys_tasks as etasks
import ibllib.pipes.audio_tasks as atasks
from ibllib.pipes.photometry_tasks import TaskFibrePhotometryPreprocess, TaskFibrePhotometryRegisterRaw
import spikeglx


[docs]def acquisition_description_legacy_session(session_path, save=False): """ From a legacy session create a dictionary corresponding to the acquisition description :return: dict """ extractor_type = ibllib.io.extractors.base.get_session_extractor_type(session_path=session_path) etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation', training='choice_world_training', ephys='choice_world_recording') dict_ad = get_acquisition_description(etype2protocol[extractor_type]) if save: sess_params.write_params(session_path=session_path, data=dict_ad) return dict_ad
[docs]def get_acquisition_description(protocol): """" This is a set of example acqusition descriptions for experiments - choice_world_recording - choice_world_biased - choice_world_training - choice_world_habituation - choice_world_passive That are part of the IBL pipeline """ if protocol == 'choice_world_recording': # canonical ephys devices = { 'cameras': { 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, }, 'neuropixel': { 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} }, 'microphone': { 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} }, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'tasks': { 'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}, 'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}, }, 'sync': { 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} }, 'procedures': ['Ephys recording with acute probe(s)'], 'projects': ['ibl_neuropixel_brainwide_01'] } else: devices = { 'cameras': { 'left': {'collection': 'raw_video_data', 'sync_label': 'frame2ttl'}, }, 'microphone': { 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} }, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'sync': { 'bpod': {'collection': 'raw_behavior_data', 'extension': 'bin'} }, 'procedures': ['Behavior training/tasks'], 'projects': ['ibl_neuropixel_brainwide_01'] } if protocol == 'choice_world_biased': key = 'biasedChoiceWorld' elif protocol == 'choice_world_training': key = 'trainingChoiceWorld' elif protocol == 'choice_world_habituation': key = 'habituationChoiceWorld' acquisition_description['tasks'] = {key: {'collection': 'raw_behavior_data', 'sync_label': 'bpod', 'main': True}} acquisition_description['version'] = sess_params.SPEC_VERSION return acquisition_description
[docs]def make_pipeline(session_path=None, **pkwargs): """ :param session_path: :param one: passed to the Pipeline init: one instance to register tasks to :param eid: passed to the Pipeline init :return: """ # NB: this pattern is a pattern for dynamic class creation # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) assert session_path tasks = OrderedDict() acquisition_description = sess_params.read_params(session_path) devices = acquisition_description.get('devices', {}) kwargs = {'session_path': session_path} # Registers the experiment description file tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) # Syncing tasks (sync, sync_args), = acquisition_description['sync'].items() sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments sync_args['sync_ext'] = sync_args.pop('extension') sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) sync_kwargs = {'sync': sync, **sync_args} sync_tasks = [] if sync == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync == 'nidq': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync == 'tdms': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) elif sync == 'bpod': pass # ATM we don't have anything for this not sure it will be needed in the future # Behavior tasks # TODO this is not doing at all what we were envisaging and going back to the old way of protocol linked to hardware # TODO change at next iteration of dynamic pipeline, once we have the basic workflow working for protocol, task_info in acquisition_description.get('tasks', []).items(): task_kwargs = {'protocol': protocol, 'collection': task_info['collection']} # - choice_world_recording # - choice_world_biased # - choice_world_training # - choice_world_habituation if 'habituation' in protocol: registration_class = btasks.HabituationRegisterRaw behaviour_class = btasks.HabituationTrialsBpod compute_status = False elif 'passiveChoiceWorld' in protocol: registration_class = btasks.PassiveRegisterRaw behaviour_class = btasks.PassiveTask compute_status = False elif sync_kwargs['sync'] == 'bpod': registration_class = btasks.TrialRegisterRaw behaviour_class = btasks.ChoiceWorldTrialsBpod compute_status = True elif sync_kwargs['sync'] == 'nidq': registration_class = btasks.TrialRegisterRaw behaviour_class = btasks.ChoiceWorldTrialsNidq compute_status = True else: raise NotImplementedError tasks[f'RegisterRaw_{protocol}'] = type(f'RegisterRaw_{protocol}', (registration_class,), {})(**kwargs, **task_kwargs) parents = [tasks[f'RegisterRaw_{protocol}']] + sync_tasks tasks[f'Trials_{protocol}'] = type(f'Trials_{protocol}', (behaviour_class,), {})( **kwargs, **sync_kwargs, **task_kwargs, parents=parents) if compute_status: tasks[f"TrainingStatus_{protocol}"] = type(f"TrainingStatus_{protocol}", (btasks.TrainingStatus,), {})( **kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}']]) # Ephys tasks if 'neuropixel' in devices: ephys_kwargs = {'device_collection': 'raw_ephys_data'} tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) all_probes = [] register_tasks = [] for pname, probe_info in devices['neuropixel'].items(): meta_file = spikeglx.glob_ephys_files(Path(session_path).joinpath(probe_info['collection']), ext='meta') meta_file = meta_file[0].get('ap') nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( **kwargs, **ephys_kwargs, pname=pname) all_probes.append(pname) register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) elif nptype == 'NP2.4' and nshanks > 1: tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] else: tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( **kwargs, **ephys_kwargs, pname=pname) register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) all_probes.append(pname) if nptype == '3A': tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) for pname in all_probes: register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] if nptype != '3A': tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) else: tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=register_task) tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']]) # Video tasks if 'cameras' in devices: video_kwargs = {'device_collection': 'raw_video_data', 'cameras': list(devices['cameras'].keys())} video_compressed = sess_params.get_video_compressed(acquisition_description) if video_compressed: # This is for widefield case where the video is already compressed tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) dlc_parent_task = tasks['VideoConvert'] tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})(**kwargs, **video_kwargs, **sync_kwargs) else: tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})(**kwargs, **video_kwargs) tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})(**kwargs, **video_kwargs, **sync_kwargs) dlc_parent_task = tasks['VideoCompress'] if sync == 'bpod': collection = sess_params.get_task_collection(acquisition_description) tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( **kwargs, **video_kwargs, **sync_kwargs, collection=collection, parents=[tasks['VideoCompress']]) elif sync == 'nidq': tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) if len(video_kwargs['cameras']) == 3: tasks[tn] = type((tn := 'DLC'), (epp.EphysDLC,), {})( **kwargs, parents=[dlc_parent_task]) tasks['PostDLC'] = type('PostDLC', (epp.EphysPostDLC,), {})( **kwargs, parents=[tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]) # Audio tasks if 'microphone' in devices: (microphone, micro_kwargs), = devices['microphone'].items() micro_kwargs['device_collection'] = micro_kwargs.pop('collection') if sync_kwargs['sync'] == 'bpod': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) elif sync_kwargs['sync'] == 'nidq': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) # Widefield tasks if 'widefield' in devices: (_, wfield_kwargs), = devices['widefield'].items() wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( **kwargs, **wfield_kwargs) tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( **kwargs, **wfield_kwargs, **sync_kwargs, parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) if 'photometry' in devices: # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']} photometry_kwargs = devices['photometry'] tasks['TaskFibrePhotometryRegisterRaw'] = type('TaskFibrePhotometryRegisterRaw', ( TaskFibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs) tasks['TaskFibrePhotometryPreprocess'] = type('TaskFibrePhotometryPreprocess', ( TaskFibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs, parents=[tasks['TaskFibrePhotometryRegisterRaw']] + sync_tasks) p = mtasks.Pipeline(session_path=session_path, **pkwargs) p.tasks = tasks return p
[docs]def make_pipeline_dict(pipeline, save=True): task_dicts = pipeline.create_tasks_list_from_pipeline() # TODO better name if save: with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: _ = yaml.dump(task_dicts, file) return task_dicts
[docs]def load_pipeline_dict(path): with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: task_list = yaml.full_load(file) return task_list