Source code for ibllib.pipes.behavior_tasks

from ibllib.pipes import base_tasks
from ibllib.io.extractors.ephys_passive import PassiveChoiceWorld
from ibllib.io.extractors import bpod_trials
from ibllib.qc.task_extractors import TaskQCExtractor
from ibllib.qc.task_metrics import HabituationQC, TaskQC
from ibllib.io.extractors.base import get_session_extractor_type
from ibllib.io.extractors.ephys_fpga import extract_all
from ibllib.pipes import training_status

import one.alf.io as alfio
from ibllib.plots.figures import BehaviourPlots
import logging
import traceback

_logger = logging.getLogger('ibllib')


[docs]class HabituationRegisterRaw(base_tasks.RegisterRawDataTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, False), ('_iblrig_encoderPositions.raw*', self.collection, False), ('_iblrig_encoderTrialInfo.raw*', self.collection, False), ('_iblrig_stimPositionScreen.raw*', self.collection, False), ('_iblrig_syncSquareUpdate.raw*', self.collection, False), ('_iblrig_ambientSensorData.raw*', self.collection, False) ] } return signature
[docs]class HabituationTrialsBpod(base_tasks.DynamicTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ], 'output_files': [ ('*trials.contrastLeft.npy', 'alf', True), ('*trials.contrastRight.npy', 'alf', True), ('*trials.feedback_times.npy', 'alf', True), ('*trials.feedbackType.npy', 'alf', True), ('*trials.goCue_times.npy', 'alf', True), ('*trials.goCueTrigger_times.npy', 'alf', True), ('*trials.intervals.npy', 'alf', True), ('*trials.rewardVolume.npy', 'alf', True), ('*trials.stimOff_times.npy', 'alf', True), ('*trials.stimOn_times.npy', 'alf', True), ('*trials.stimOnTrigger_times.npy', 'alf', True), ] } return signature def _run(self, update=True): """ Extracts an iblrig training session """ # TODO this doesn't use the self.collection in any way, always assumes data in raw_behavior_data, needs to be changed trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=True) if trials is None: return None if self.one is None or self.one.offline: return output_files # Run the task QC # Compile task data for QC qc = HabituationQC(self.session_path, one=self.one) qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync) qc.run(update=update) return output_files
[docs]class TrialRegisterRaw(base_tasks.RegisterRawDataTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, False), ('_iblrig_encoderPositions.raw*', self.collection, False), ('_iblrig_encoderTrialInfo.raw*', self.collection, False), ('_iblrig_stimPositionScreen.raw*', self.collection, False), ('_iblrig_syncSquareUpdate.raw*', self.collection, False), ('_iblrig_ambientSensorData.raw*', self.collection, False) ] } return signature
[docs]class PassiveRegisterRaw(base_tasks.RegisterRawDataTask): priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True), ('_iblrig_encoderTrialInfo.raw*', self.collection, True), ('_iblrig_stimPositionScreen.raw*', self.collection, True), ('_iblrig_syncSquareUpdate.raw*', self.collection, True), ('_iblrig_RFMapStim.raw*', self.collection, True)] } return signature
[docs]class PassiveTask(base_tasks.DynamicTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True), ('_iblrig_RFMapStim.raw*', self.collection, True), (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, True), ('*.wiring.json', self.sync_collection, False), ('*.meta', self.sync_collection, False)], 'output_files': [('_ibl_passiveGabor.table.csv', 'alf', True), ('_ibl_passivePeriods.intervalsTable.csv', 'alf', True), ('_ibl_passiveRFM.times.npy', 'alf', True), ('_ibl_passiveStims.table.csv', 'alf', True)] } return signature def _run(self): """returns a list of pathlib.Paths. """ data, paths = PassiveChoiceWorld(self.session_path).extract( sync_collection=self.sync_collection, task_collection=self.collection, save=True) if any([x is None for x in paths]): self.status = -1 return paths
[docs]class ChoiceWorldTrialsBpod(base_tasks.DynamicTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True)], 'output_files': [ ('*trials.goCueTrigger_times.npy', 'alf', True), ('*trials.stimOnTrigger_times.npy', 'alf', False), ('*trials.table.pqt', 'alf', True), ('*wheel.position.npy', 'alf', True), ('*wheel.timestamps.npy', 'alf', True), ('*wheelMoves.intervals.npy', 'alf', True), ('*wheelMoves.peakAmplitude.npy', 'alf', True) ] } return signature def _run(self, update=True): """ Extracts an iblrig training session """ # TODO this doesn't use the self.collection in any way, always assumes data in raw_behavior_data, needs to be changed trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=True) if trials is None: return None if self.one is None or self.one.offline: return output_files # Run the task QC # Compile task data for QC type = get_session_extractor_type(self.session_path) if type == 'habituation': qc = HabituationQC(self.session_path, one=self.one) qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync) else: # Update wheel data qc = TaskQC(self.session_path, one=self.one) qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync) qc.extractor.wheel_encoding = 'X1' # Aggregate and update Alyx QC fields qc.run(update=update) return output_files
[docs]class ChoiceWorldTrialsNidq(base_tasks.DynamicTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('_iblrig_encoderEvents.raw*', self.collection, True), ('_iblrig_encoderPositions.raw*', self.collection, True), (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), ('*wiring.json', self.sync_collection, False), ('*.meta', self.sync_collection, True)], 'output_files': [ ('*trials.goCueTrigger_times.npy', 'alf', True), ('*trials.intervals_bpod.npy', 'alf', False), ('*trials.stimOff_times.npy', 'alf', False), ('*trials.table.pqt', 'alf', True), ('*wheel.position.npy', 'alf', True), ('*wheel.timestamps.npy', 'alf', True), ('*wheelMoves.intervals.npy', 'alf', True), ('*wheelMoves.peakAmplitude.npy', 'alf', True) ] } return signature def _behaviour_criterion(self, update=True): """ Computes and update the behaviour criterion on Alyx """ from brainbox.behavior import training trials = alfio.load_object(self.session_path.joinpath("alf"), "trials") good_enough = training.criterion_delay( n_trials=trials["intervals"].shape[0], perf_easy=training.compute_performance_easy(trials), ) if update: eid = self.one.path2eid(self.session_path, query_type='remote') self.one.alyx.json_field_update( "sessions", eid, "extended_qc", {"behavior": int(good_enough)} ) def _extract_behaviour(self): dsets, out_files = extract_all(self.session_path, self.sync_collection, save=True) return dsets, out_files def _run(self, update=True, plot_qc=True): dsets, out_files = self._extract_behaviour() if not self.one or self.one.offline: return out_files self._behaviour_criterion(update=update) # Run the task QC # TODO this doesn't use the self.collection in any way, always assumes data in raw_behavior_data, needs to be changed qc = TaskQC(self.session_path, one=self.one, log=_logger) qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one, sync_collection=self.sync_collection, sync_type=self.sync) # Extract extra datasets required for QC qc.extractor.data = dsets qc.extractor.extract_data() # Aggregate and update Alyx QC fields qc.run(update=update) if plot_qc: _logger.info("Creating Trials QC plots") try: session_id = self.one.path2eid(self.session_path) plot_task = BehaviourPlots(session_id, self.session_path, one=self.one) _ = plot_task.run() self.plot_tasks.append(plot_task) except Exception: _logger.error('Could not create Trials QC Plot') _logger.error(traceback.format_exc()) self.status = -1 return out_files
[docs]class TrainingStatus(base_tasks.DynamicTask): priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [ ('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('*trials.table.pqt', 'alf', True)], 'output_files': [] } return signature def _run(self, upload=True): """ Extracts training status for subject """ df = training_status.get_latest_training_information(self.session_path, self.one) if df is not None: training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload) output_files = [] return output_files