Source code for ibllib.pipes.video_tasks

import logging
import subprocess

from ibllib.io import ffmpeg
from ibllib.pipes import base_tasks
from ibllib.io.video import label_from_path, get_video_meta
from ibllib.io.extractors import camera
from ibllib.qc.camera import run_all_qc as run_camera_qc

_logger = logging.getLogger('ibllib')


[docs]class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): """ Task to register raw video data. Builds up list of files to register from list of cameras given in session params file """ priority = 100 job_size = 'small' @property def signature(self): signature = { 'input_files': [], 'output_files': [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] } return signature
[docs]class VideoCompress(base_tasks.VideoTask): """ Task to compress raw video data from .avi to .mp4 format. """ priority = 90 job_size = 'large' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] } return signature def _run(self): # TODO different compression parameters based on whether it is training or not based on number of camers? # avi to mp4 compression if self.sync == 'bpod': command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' '-nostats -codec:a copy {file_out}') else: command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' '-loglevel 0 -codec:a copy {file_out}') output_files = ffmpeg.iblrig_video_compression(self.session_path, command) if len(output_files) == 0: _logger.info('No compressed videos found') return return output_files
[docs]class VideoConvert(base_tasks.VideoTask): """ Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation """ priority = 90 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] } return signature def _run(self): output_files = [] for cam in self.cameras: # rename and register the camlog files camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') camlog_file.replace(new_camlog_file) output_files.append(new_camlog_file) # convert the avi files to mp4 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') command2run = f"ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}" process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) info, error = process.communicate() if process.returncode == 0: # check the video meta matched and remove the original file meta_avi = get_video_meta(avi_file) _ = meta_avi.pop('size') meta_mp4 = get_video_meta(mp4_file) match = True for key in meta_avi.keys(): if meta_avi[key] != meta_mp4[key]: match = False # if all checks out we can remove the original avi if match: avi_file.unlink() output_files.append(mp4_file) else: _logger.error(f'avi and mp4 meta data do not match for {avi_file}') else: _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') return output_files
[docs]class VideoSyncQcCamlog(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation """ priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), ('*.wiring.json', self.sync_collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature def _run(self, qc=True, **kwargs): mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') labels = [label_from_path(x) for x in mp4_files] # Video timestamps extraction output_files = [] data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, save=True, labels=labels, camlog=True) output_files.extend(files) # Video QC if qc: run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync) return output_files
[docs]class VideoSyncQcBpod(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server """ priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [('_iblrig_taskData.raw.*', self.collection, True), ('_iblrig_taskSettings.raw.*', self.collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature def _run(self, **kwargs): mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') labels = [label_from_path(x) for x in mp4_files] # Video timestamps extraction output_files = [] data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, save=True, labels=labels) output_files.extend(files) # Video QC run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, sync_collection=self.sync_collection, sync_type=self.sync) return output_files
[docs]class VideoSyncQcNidq(base_tasks.VideoTask): """ Task to sync camera timestamps to main DAQ timestamps N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server """ priority = 40 job_size = 'small' @property def signature(self): signature = { 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), ('*.wiring.json', self.sync_collection, True), ('*wheel.position.npy', 'alf', False), ('*wheel.timestamps.npy', 'alf', False)], 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] } return signature def _run(self, **kwargs): mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') labels = [label_from_path(x) for x in mp4_files] # Video timestamps extraction output_files = [] data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, save=True, labels=labels) output_files.extend(files) # Video QC run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, sync_collection=self.sync_collection, sync_type=self.sync) return output_files