"""A module for handling experiment description files.
Each device computer adds its piece of information and consolidates into the final acquisition
description.
The purpose is 3-fold:
- provide modularity in the extraction: the acquisition description allows to dynamically build
pipelines.
- assist the copying of the experimental data from device computers to the server computers, in
a way that each device is independent from another.
- assist the copying of the experimental data from device computers to the server computers, in
a way that intermediate states (failed copies) are easily recoverable from and completion
criteria (ie. session ready to extract) is objective and simple (all device files copied).
INGRESS
- each device computer needs to know the session path on the server.
- create a device file locally in a queue directory. This will serve as a copy flag.
- copy the device file to the local server.
EGRESS
- got through the queue and for each item:
- if the device file is not on the server create it.
- once copy is complete aggregate the qc from file.
"""
import yaml
import time
from datetime import datetime
import logging
from pathlib import Path
import warnings
from pkg_resources import parse_version
import ibllib.pipes.misc as misc
_logger = logging.getLogger(__name__)
SPEC_VERSION = '0.1.0'
[docs]def write_yaml(file_path, data):
"""
Write a device file. This is basically just a yaml dump that ensures the folder tree exists.
Parameters
----------
file_path : pathlib.Path
The full path to the description yaml file to write to.
data : dict
The data to write to the yaml file.
"""
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(file_path, 'w+') as fp:
yaml.safe_dump(data, fp)
def _patch_file(data: dict) -> dict:
"""
Update older description data to conform to the most recent specification.
Parameters
----------
data : dict
The description yaml data.
Returns
-------
dict
The patched description data.
"""
if data and (v := data.get('version', '0')) != SPEC_VERSION:
if parse_version(v) > parse_version(SPEC_VERSION):
_logger.warning('Description file generated by more recent code')
data['version'] = SPEC_VERSION
return data
[docs]def write_params(session_path, data) -> Path:
"""
:param session_path : pathlib.Path, str
:param ad:
:return: pathlib.Path: yaml full file path
"""
yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml')
write_yaml(yaml_file, data)
return yaml_file
[docs]def read_params(path) -> dict:
"""
Load an experiment description file.
In addition to reading the yaml data, this functions ensures that the specification is the most
recent one. If the file is missing None is returned. If the file cannot be parsed an empty
dict is returned.
Parameters
----------
path : pathlib.Path, str
The path to the description yaml file (or it's containing folder) to load.
Returns
-------
dict, None
The parsed yaml data, or None if the file was not found.
Examples
--------
# Load a session's _ibl_experiment.description.yaml file
>>> data = read_params('/home/data/subject/2020-01-01/001')
# Load a specific device's description file
>>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml')
"""
if (path := Path(path)).is_dir():
yaml_file = next(path.glob('_ibl_experiment.description*'), None)
else:
yaml_file = path if path.exists() else None
if not yaml_file:
_logger.debug('Experiment description not found: %s', path)
return
with open(yaml_file, 'r') as fp:
data = _patch_file(yaml.safe_load(fp) or {})
return data
[docs]def aggregate_device(file_device, file_acquisition_description, unlink=False):
"""
Add the contents of a device file to the main acquisition description file.
Parameters
----------
file_device : pathlib.Path
The full path to the device yaml file to add to the main description file.
file_acquisition_description : pathlib.Path
The full path to the main acquisition description yaml file to add the device file to.
unlink : bool
If True, the device file is removed after successfully aggregation.
Raises
------
AssertionError
Device file contains a main 'sync' key that is already present in the main description
file. For an experiment only one main sync device is allowed.
"""
# if a lock file exists retries 5 times to see if it exists
attempts = 0
file_lock = file_acquisition_description.with_suffix('.lock')
# reads in the partial device data
data_device = read_params(file_device)
if not data_device:
_logger.warning('empty device file "%s"', file_device)
return
while True:
if not file_lock.exists() or attempts >= 4:
break
_logger.info('file lock found, waiting 2 seconds %s', file_lock)
time.sleep(2)
attempts += 1
# if the file still exists after 5 attempts, remove it as it's a job that went wrong
if file_lock.exists():
with open(file_lock, 'r') as fp:
_logger.debug('file lock contents: %s', yaml.safe_load(fp))
_logger.info('stale file lock found, deleting %s', file_lock)
file_lock.unlink()
# add in the lock file, add some meta data to ease debugging if one gets stuck
with open(file_lock, 'w') as fp:
yaml.safe_dump(dict(datetime=datetime.utcnow().isoformat(), file_device=str(file_device)), fp)
# if the acquisition description file already exists, read in the yaml content
if file_acquisition_description.exists():
acq_desc = read_params(file_acquisition_description)
else:
acq_desc = {}
# merge the dictionaries
for k in data_device:
if k == 'sync':
assert k not in acq_desc, 'multiple sync fields defined'
if isinstance(data_device[k], list):
acq_desc[k] = acq_desc.get(k, []) + data_device[k]
elif isinstance(data_device[k], dict):
acq_desc[k] = {**acq_desc.get(k, {}), **data_device[k]}
with open(file_acquisition_description, 'w') as fp:
yaml.safe_dump(acq_desc, fp)
# unlink the local file
file_lock.unlink()
# delete the original file if necessary
if unlink:
file_device.unlink()
[docs]def get_cameras(sess_params):
devices = sess_params.get('devices', {})
cameras = devices.get('cameras', None)
return None if not cameras else list(cameras.keys())
[docs]def get_sync(sess_params):
sync = sess_params.get('sync', None)
if not sync:
return None
else:
(sync, _), = sync.items()
return sync
[docs]def get_sync_collection(sess_params):
sync = sess_params.get('sync', None)
if not sync:
return None
else:
(_, sync_details), = sync.items()
return sync_details.get('collection', None)
[docs]def get_sync_extension(sess_params):
sync = sess_params.get('sync', None)
if not sync:
return None
else:
(_, sync_details), = sync.items()
return sync_details.get('extension', None)
[docs]def get_sync_namespace(sess_params):
sync = sess_params.get('sync', None)
if not sync:
return None
else:
(_, sync_details), = sync.items()
return sync_details.get('acquisition_software', None)
[docs]def get_task_protocol(sess_params, task_collection):
protocols = sess_params.get('tasks', None)
if not protocols:
return None
else:
protocol = None
for prot, details in sess_params.get('tasks').items():
if details.get('collection') == task_collection:
protocol = prot
return protocol
[docs]def get_task_collection(sess_params):
protocols = sess_params.get('tasks', None)
if not protocols:
return None
elif len(protocols) > 1:
return 'raw_behavior_data'
else:
for prot, details in protocols.items():
return details['collection']
[docs]def get_device_collection(sess_params, device):
# TODO
return None
[docs]def get_video_compressed(sess_params):
videos = sess_params.get('devices', {}).get('cameras', None)
if not videos:
return None
# This is all of nothing, assumes either all videos or not compressed
for key, vals in videos.items():
compressed = vals.get('compressed', False)
return compressed
[docs]def prepare_experiment(session_path, acquisition_description=None, local=None, remote=None):
"""
Copy acquisition description yaml to the server and local transfers folder.
Parameters
----------
session_path : str, pathlib.Path, pathlib.PurePath
The RELATIVE session path, e.g. subject/2020-01-01/001.
"""
# Determine if user passed in arg for local/remote subject folder locations or pull in from
# local param file or prompt user if missing
params = misc.create_basic_transfer_params(transfers_path=local, remote_data_path=remote)
# First attempt to copy to server
remote_device_path = Path(params['REMOTE_DATA_FOLDER_PATH']).joinpath(session_path, '_devices')
try:
for label, data in acquisition_description.items():
write_yaml(remote_device_path.joinpath(f'{label}.yaml'), data)
except Exception as ex:
warnings.warn(f'Failed to write data to {remote_device_path}: {ex}')
# Now copy to local directory
local_device_path = Path(params['TRANSFERS_PATH']).joinpath(session_path)
for label, data in acquisition_description.items():
write_yaml(local_device_path.joinpath(f'{label}.yaml'), data)