from pathlib import Path
import json
import datetime
import logging
import re
import shutil
from requests import HTTPError
from pkg_resources import parse_version
from dateutil import parser as dateparser
from iblutil.io import hashfile
from one.alf.files import get_session_path, folder_parts
import one.alf.exceptions as alferr
from one.api import ONE
import ibllib
import ibllib.io.extractors.base
import ibllib.time
import ibllib.io.raw_data_loaders as raw
from ibllib.io import flags
_logger = logging.getLogger(__name__)
EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
REGISTRATION_GLOB_PATTERNS = ['alf/**/*.*',
'raw_behavior_data/**/_iblrig_*.*',
'raw_passive_data/**/_iblrig_*.*',
'raw_behavior_data/**/_iblmic_*.*',
'raw_video_data/**/_iblrig_*.*',
'raw_video_data/**/_ibl_*.*',
'raw_ephys_data/**/_iblrig_*.*',
'raw_ephys_data/**/_spikeglx_*.*',
'raw_ephys_data/**/_iblqc_*.*',
'spikesorters/**/_kilosort_*.*'
'spikesorters/**/_kilosort_*.*',
'raw_widefield_data/**/_ibl_*.*',
'raw_photometry_data/**/_neurophotometrics_*.*'
]
def _check_filename_for_registration(full_file, patterns):
for pat in patterns:
reg = pat.replace('.', r'\.').replace('_', r'\_').replace('*', r'.*')
if Path(full_file).suffix in EXCLUDED_EXTENSIONS:
return False
elif re.match(reg, Path(full_file).name, re.IGNORECASE):
return True
return False
[docs]def register_dataset(file_list, one=None, created_by=None, repository=None, server_only=False,
versions=None, default=True, dry=False, max_md5_size=None, exists=False):
"""
Registers a set of files belonging to a session only on the server
:param file_list: (list of pathlib.Path or pathlib.Path)
:param one: optional (one.api.One), current one object, will create an instance if not provided
:param created_by: (string) name of user in Alyx (defaults to 'root')
:param repository: optional: (string) name of the repository in Alyx
:param server_only: optional: (bool) if True only creates on the Flatiron (defaults to False)
:param versions: optional (list of strings): versions tags (defaults to ibllib version)
:param default: optional (bool) whether to set as default dataset (defaults to True)
:param dry: (bool) False by default
:param max_md5_size: (int) maximum file in bytes to compute md5 sum (always compute if None)
defaults to None
:return:
"""
# If the repository is specified then for the registration client we want server_only=True to make sure we don't make
# any other repositories for the lab
if repository and not server_only:
server_only = True
if created_by is None:
created_by = one.alyx.user
if file_list is None or file_list == '' or file_list == []:
return
elif not isinstance(file_list, list):
file_list = [Path(file_list)]
assert len(set([get_session_path(f) for f in file_list])) == 1
assert all([Path(f).exists() for f in file_list])
if versions is None:
versions = ibllib.__version__
if isinstance(versions, str):
versions = [versions for _ in file_list]
assert isinstance(versions, list) and len(versions) == len(file_list)
# computing the md5 can be very long, so this is an option to skip if the file is bigger
# than a certain threshold
if max_md5_size:
hashes = [hashfile.md5(p) if
p.stat().st_size < max_md5_size else None for p in file_list]
else:
hashes = [hashfile.md5(p) for p in file_list]
session_path = get_session_path(file_list[0])
# first register the file
r = {'created_by': created_by,
'path': session_path.relative_to((session_path.parents[2])).as_posix(),
'filenames': [p.relative_to(session_path).as_posix() for p in file_list],
'name': repository,
'server_only': server_only,
'hashes': hashes,
'filesizes': [p.stat().st_size for p in file_list],
'versions': versions,
'default': default,
'exists': exists,
'check_protected': True} # flag to see if any datasets are protected
if not dry:
if one is None:
one = ONE(cache_rest=None)
try:
response = one.alyx.rest('register-file', 'create', data=r, no_cache=True)
for p in file_list:
_logger.info(f"ALYX REGISTERED DATA: {p}")
return response
except HTTPError as err:
err_message = json.loads(err.response.text)
if err_message['status_code'] == 403 and err_message['error'] == 'One or more datasets is protected':
response = err_message['details']
today_revision = datetime.datetime.today().strftime('%Y-%m-%d')
new_file_list = []
for fl, res in zip(file_list, response):
(name, prot_info), = res.items()
# Dataset has not yet been registered
if prot_info == []:
new_file_list.append(fl)
continue
else:
# Check to see if the file path already has a revision in it
file_revision = folder_parts(fl, as_dict=True)['revision']
if file_revision:
# Find existing protected revisions
existing_revisions = [key for pr in prot_info for key, val in pr.items() if val]
# If the revision explicitly defined by the user doesn't exist or is not protected, register as is
if file_revision not in existing_revisions:
revision_path = fl.parent
else:
i = 97 # equivalent to 'a'
new_revision = file_revision + chr(i).lower()
# Find the next subrevision that isn't protected
while new_revision in existing_revisions:
i += 1
new_revision = file_revision + chr(i).lower()
revision_path = fl.parent.parent.joinpath(f'#{new_revision}#')
if revision_path != fl.parent:
revision_path.mkdir(exist_ok=True)
shutil.move(fl, revision_path.joinpath(fl.name))
new_file_list.append(revision_path.joinpath(fl.name))
continue
else:
fl_path = fl.parent
assert name == fl_path.relative_to(session_path).joinpath(fl.name).as_posix()
# Find info about the latest revision, N.B on django side prot_info is sorted by latest revisions first
(latest_revision, protected), = prot_info[0].items()
# If the latest revision is the original and it is unprotected no need for revision
# e.g {'clusters.amp.npy': [{'': False}]}
if latest_revision == '' and not protected:
# Use original path
revision_path = fl_path
# If there already is a revision but it is unprotected, move into this revision folder
# e.g {'clusters.amp.npy': [{'2022-10-31': False}, {'2022-05-31': True}, {'': True}]}
elif not protected:
# Check that the latest_revision has the date naming convention we expect 'YYYY-MM-DD'
try:
_ = datetime.datetime.strptime(latest_revision[:10], '%Y-%m-%d')
revision_path = fl_path.joinpath(f'#{latest_revision}#')
# If it doesn't it probably has been made manually so we don't want to overwrite this and instead
# use today's date
except ValueError:
revision_path = fl_path.joinpath(f'#{today_revision}#')
# If protected and the latest protected revision is from today we need to make a subrevision
elif protected and today_revision in latest_revision:
if latest_revision == today_revision:
new_revision = today_revision + 'a'
else:
alpha = latest_revision[-1]
new_revision = today_revision + chr(ord(alpha) + 1).lower()
revision_path = fl_path.joinpath(f'#{new_revision}#')
# Otherwise cases move into revision from today
# e.g {'clusters.amp.npy': [{'': True}]}
# e.g {'clusters.amp.npy': [{'2022-10-31': True}, {'': True}]}
else:
revision_path = fl_path.joinpath(f'#{today_revision}#')
# Only move for the cases where a revision folder has been made
if revision_path != fl_path:
revision_path.mkdir(exist_ok=True)
shutil.move(fl, revision_path.joinpath(fl.name))
new_file_list.append(revision_path.joinpath(fl.name))
file_list = new_file_list
r = {'created_by': created_by,
'path': session_path.relative_to((session_path.parents[2])).as_posix(),
'filenames': [p.relative_to(session_path).as_posix() for p in file_list],
'name': repository,
'server_only': server_only,
'hashes': hashes,
'filesizes': [p.stat().st_size for p in file_list],
'versions': versions,
'default': default,
'exists': exists,
'check_protected': False}
response = one.alyx.rest('register-file', 'create', data=r, no_cache=True)
for p in file_list:
_logger.info(f"ALYX REGISTERED DATA: {p}")
return response
else:
raise err
[docs]def register_session_raw_data(session_path, one=None, overwrite=False, dry=False, **kwargs):
"""
Registers all files corresponding to raw data files to Alyx. It will select files that
match Alyx registration patterns.
:param session_path:
:param one: one instance to work with
:param overwrite: (False) if set to True, will patch the datasets. It will take very long.
If set to False (default) will skip all already registered data.
:param dry: do not register files, returns the list of files to be registered
:return: list of file to register
:return: Alyx response: dictionary of registered files
"""
session_path = Path(session_path)
one.alyx.clear_rest_cache() # Ensure data are from database
eid = one.path2eid(session_path, query_type='remote') # needs to make sure we're up to date
# query the database for existing datasets on the session and allowed dataset types
dsets = one.alyx.rest('datasets', 'list', session=eid)
already_registered = [
session_path.joinpath(Path(ds['collection'] or '').joinpath(ds['name'])) for ds in dsets]
dtypes = one.alyx.rest('dataset-types', 'list')
registration_patterns = [dt['filename_pattern'] for dt in dtypes if dt['filename_pattern']]
# glob all the files
glob_patterns = [pat for pat in REGISTRATION_GLOB_PATTERNS if pat.startswith('raw')]
files_2_register = []
for gp in glob_patterns:
f2r = list(session_path.glob(gp))
files_2_register.extend(f2r)
# filter 1/2 filter out datasets that do not match any dataset type
files_2_register = list(filter(lambda f: _check_filename_for_registration(
f, registration_patterns), files_2_register))
# filter 2/2 unless overwrite is True, filter out the datasets that already exist
if not overwrite:
files_2_register = list(filter(lambda f: f not in already_registered, files_2_register))
data_repo = get_local_data_repository(one)
response = register_dataset(files_2_register, one=one, versions=None, dry=dry, repository=data_repo, **kwargs)
return files_2_register, response
[docs]class RegistrationClient:
"""
Object that keeps the ONE instance and provides method to create sessions and register data.
"""
def __init__(self, one=None):
self.one = one
if not one:
self.one = ONE(cache_rest=None)
self.dtypes = self.one.alyx.rest('dataset-types', 'list')
self.registration_patterns = [
dt['filename_pattern'] for dt in self.dtypes if dt['filename_pattern']]
self.file_extensions = [df['file_extension'] for df in
self.one.alyx.rest('data-formats', 'list', no_cache=True)]
[docs] def create_sessions(self, root_data_folder, glob_pattern='**/create_me.flag', dry=False):
"""
Create sessions looking recursively for flag files
:param root_data_folder: folder to look for create_me.flag
:param dry: bool. Dry run if True
:param glob_pattern: bool. Dry run if True
:return: None
"""
flag_files = Path(root_data_folder).glob(glob_pattern)
for flag_file in flag_files:
if dry:
print(flag_file)
continue
try:
_logger.info('creating session for ' + str(flag_file.parent))
# providing a false flag stops the registration after session creation
self.create_session(flag_file.parent)
flag_file.unlink()
except BaseException as e:
_logger.error(f'Error creating session for {flag_file.parent}\n{e}')
_logger.warning(f'Skipping {flag_file.parent}')
continue
return [ff.parent for ff in flag_files]
[docs] def create_session(self, session_path, **kwargs):
"""
create_session(session_path)
"""
return self.register_session(session_path, file_list=False, **kwargs)
[docs] def register_sync(self, root_data_folder, dry=False):
"""
Register sessions looking recursively for flag files
:param root_data_folder: folder to look for register_me.flag
:param dry: bool. Dry run if True
:return:
"""
flag_files = Path(root_data_folder).glob('**/register_me.flag')
for flag_file in flag_files:
if dry:
print(flag_file)
continue
file_list = flags.read_flag_file(flag_file)
_logger.info('registering ' + str(flag_file.parent))
self.register_session(flag_file.parent, file_list=file_list)
flags.write_flag_file(flag_file.parent.joinpath('flatiron.flag'), file_list=file_list)
flag_file.unlink()
if flag_file.parent.joinpath('create_me.flag').exists():
flag_file.parent.joinpath('create_me.flag').unlink()
_logger.info('registered' + '\n')
[docs] def register_session(self, ses_path, file_list=True, projects=None, procedures=None):
"""
Register session in Alyx
:param ses_path: path to the session
:param file_list: bool. Set to False will only create the session and skip registration
:param projects: list of strings corresponding to project names in the database. If set to
None, defaults to the subject projects. Here is how to get a list of current projects
>>> sorted([proj['name'] for proj in one.alyx.rest('projects', 'list')])
:param procedures: (None) list of session procedure to label the session with. They should
correspond to procedures in the database.
>>> sorted([proc['name'] for proc in one.alyx.rest('procedures', 'list')])
:return: Status string on error
"""
if isinstance(ses_path, str):
ses_path = Path(ses_path)
# read meta data from the rig for the session from the task settings file
settings_json_file = list(ses_path.glob(
'**/raw_behavior_data/_iblrig_taskSettings.raw*.json'))
if not settings_json_file:
settings_json_file = list(ses_path.glob('**/_iblrig_taskSettings.raw*.json'))
if not settings_json_file:
_logger.error(['could not find _iblrig_taskSettings.raw.json. Abort.'])
raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
_logger.warning([f'Settings found in a strange place: {settings_json_file}'])
else:
settings_json_file = settings_json_file[0]
md = _read_settings_json_compatibility_enforced(settings_json_file)
# query alyx endpoints for subject, error if not found
try:
subject = self.one.alyx.rest(
'subjects', 'list', nickname=md['SUBJECT_NAME'], no_cache=True)[0]
except IndexError:
_logger.error(f"Subject: {md['SUBJECT_NAME']} doesn't exist in Alyx. ABORT.")
raise alferr.AlyxSubjectNotFound(md['SUBJECT_NAME'])
# look for a session from the same subject, same number on the same day
session_id, session = self.one.search(subject=subject['nickname'],
date_range=md['SESSION_DATE'],
number=md['SESSION_NUMBER'],
details=True, query_type='remote')
try:
user = self.one.alyx.rest('users', 'read', id=md["PYBPOD_CREATOR"][0], no_cache=True)
except Exception as e:
_logger.error(f"User: {md['PYBPOD_CREATOR'][0]} doesn't exist in Alyx. ABORT")
raise e
username = user['username'] if user else subject['responsible_user']
# load the trials data to get information about session duration and performance
ses_data = raw.load_data(ses_path)
start_time, end_time = _get_session_times(ses_path, md, ses_data)
n_trials, n_correct_trials = _get_session_performance(md, ses_data)
# this is the generic relative path: subject/yyyy-mm-dd/NNN
gen_rel_path = Path(subject['nickname'], md['SESSION_DATE'],
'{0:03d}'.format(int(md['SESSION_NUMBER'])))
task_protocol = md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION_TAG']
# unless specified label the session projects with subject projects
projects = subject['projects'] if projects is None else projects
# makes sure projects is a list
projects = [projects] if isinstance(projects, str) else projects
# unless specified label the session procedures with task protocol lookup
procedures = procedures if procedures else _alyx_procedure_from_task(task_protocol)
procedures = [procedures] if isinstance(procedures, str) else procedures
json_fields_names = ['IS_MOCK', 'POOP_COUNT', 'IBLRIG_VERSION']
json_field = {f: md[f] for f in json_fields_names if f in md}
if not session:
ses_ = {'subject': subject['nickname'],
'users': [username],
'location': md['PYBPOD_BOARD'],
'procedures': procedures,
'lab': subject['lab'],
'projects': projects,
'type': 'Experiment',
'task_protocol': task_protocol,
'number': md['SESSION_NUMBER'],
'start_time': ibllib.time.date2isostr(start_time),
'end_time': ibllib.time.date2isostr(end_time) if end_time else None,
'n_correct_trials': n_correct_trials,
'n_trials': n_trials,
'json': json_field,
}
session = self.one.alyx.rest('sessions', 'create', data=ses_)
if md['SUBJECT_WEIGHT']:
wei_ = {'subject': subject['nickname'],
'date_time': ibllib.time.date2isostr(start_time),
'weight': md['SUBJECT_WEIGHT'],
'user': username
}
self.one.alyx.rest('weighings', 'create', data=wei_)
else: # TODO: if session exists and no json partial_upgrade it
session = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True)
_logger.info(session['url'] + ' ')
# create associated water administration if not found
if not session['wateradmin_session_related'] and ses_data:
wa_ = {
'subject': subject['nickname'],
'date_time': ibllib.time.date2isostr(end_time),
'water_administered': ses_data[-1]['water_delivered'] / 1000,
'water_type': md.get('REWARD_TYPE') or 'Water',
'user': username,
'session': session['url'][-36:],
'adlib': False}
self.one.alyx.rest('water-administrations', 'create', data=wa_)
# at this point the session has been created. If create only, exit
if not file_list:
return session
# register all files that match the Alyx patterns, warn user when files are encountered
rename_files_compatibility(ses_path, md['IBLRIG_VERSION_TAG'])
F = [] # empty list whose keys will be relative paths and content filenames
md5s = []
file_sizes = []
for fn in _glob_session(ses_path):
if fn.suffix in EXCLUDED_EXTENSIONS:
_logger.debug('Excluded: ', str(fn))
continue
if not _check_filename_for_registration(fn, self.registration_patterns):
_logger.warning('No matching dataset type for: ' + str(fn))
continue
if fn.suffix not in self.file_extensions:
_logger.warning('No matching dataformat (ie. file extension) for: ' + str(fn))
continue
if not _register_bool(fn.name, file_list):
_logger.debug('Not in filelist: ' + str(fn))
continue
try:
assert (str(gen_rel_path) in str(fn))
except AssertionError as e:
strerr = 'ALF folder mismatch: data is in wrong subject/date/number folder. \n'
strerr += ' Expected ' + str(gen_rel_path) + ' actual was ' + str(fn)
_logger.error(strerr)
raise e
# extract the relative path of the file
rel_path = Path(str(fn)[str(fn).find(str(gen_rel_path)):])
F.append(str(rel_path.relative_to(gen_rel_path).as_posix()))
file_sizes.append(fn.stat().st_size)
md5s.append(hashfile.md5(fn) if fn.stat().st_size < 1024 ** 3 else None)
_logger.info('Registering ' + str(fn))
r_ = {'created_by': username,
'path': str(gen_rel_path.as_posix()),
'filenames': F,
'hashes': md5s,
'filesizes': file_sizes,
'versions': [ibllib.__version__ for _ in F]
}
self.one.alyx.post('/register-file', data=r_)
return session
def _alyx_procedure_from_task(task_protocol):
task_type = ibllib.io.extractors.base.get_task_extractor_type(task_protocol)
procedure = _alyx_procedure_from_task_type(task_type)
return procedure or []
def _alyx_procedure_from_task_type(task_type):
lookup = {'biased': 'Behavior training/tasks',
'biased_opto': 'Behavior training/tasks',
'habituation': 'Behavior training/tasks',
'training': 'Behavior training/tasks',
'ephys': 'Ephys recording with acute probe(s)',
'ephys_biased_opto': 'Ephys recording with acute probe(s)',
'ephys_passive_opto': 'Ephys recording with acute probe(s)',
'ephys_replay': 'Ephys recording with acute probe(s)',
'ephys_training': 'Ephys recording with acute probe(s)',
'mock_ephys': 'Ephys recording with acute probe(s)',
'sync_ephys': 'Ephys recording with acute probe(s)'}
try:
# look if there are tasks in the personal projects repo with proceedures
import projects.base
custom_tasks = Path(projects.base.__file__).parent.joinpath('task_type_procedures.json')
with open(custom_tasks) as fp:
lookup.update(json.load(fp))
except (ModuleNotFoundError, FileNotFoundError):
pass
if task_type in lookup:
return lookup[task_type]
def _register_bool(fn, file_list):
if isinstance(file_list, bool):
return file_list
if isinstance(file_list, str):
file_list = [file_list]
return any([str(fil) in fn for fil in file_list])
def _read_settings_json_compatibility_enforced(json_file):
with open(json_file) as js:
md = json.load(js)
if 'IS_MOCK' not in md.keys():
md['IS_MOCK'] = False
if 'IBLRIG_VERSION_TAG' not in md.keys():
md['IBLRIG_VERSION_TAG'] = '3.2.3'
if not md['IBLRIG_VERSION_TAG']:
_logger.warning("You appear to be on an untagged version...")
return md
# 2018-12-05 Version 3.2.3 fixes (permanent fixes in IBL_RIG from 3.2.4 on)
if parse_version(md['IBLRIG_VERSION_TAG']) <= parse_version('3.2.3'):
if 'LAST_TRIAL_DATA' in md.keys():
md.pop('LAST_TRIAL_DATA')
if 'weighings' in md['PYBPOD_SUBJECT_EXTRA'].keys():
md['PYBPOD_SUBJECT_EXTRA'].pop('weighings')
if 'water_administration' in md['PYBPOD_SUBJECT_EXTRA'].keys():
md['PYBPOD_SUBJECT_EXTRA'].pop('water_administration')
if 'IBLRIG_COMMIT_HASH' not in md.keys():
md['IBLRIG_COMMIT_HASH'] = 'f9d8905647dbafe1f9bdf78f73b286197ae2647b'
# parse the date format to Django supported ISO
dt = dateparser.parse(md['SESSION_DATETIME'])
md['SESSION_DATETIME'] = ibllib.time.date2isostr(dt)
# add the weight key if it doesn't already exists
if 'SUBJECT_WEIGHT' not in md.keys():
md['SUBJECT_WEIGHT'] = None
return md
[docs]def rename_files_compatibility(ses_path, version_tag):
if not version_tag:
return
if parse_version(version_tag) <= parse_version('3.2.3'):
task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy')
for fn in task_code:
fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy'))
task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip')
for fn in task_code:
fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
def _get_session_times(fn, md, ses_data):
"""
Get session start and end time from the Bpod data
"""
start_time = ibllib.time.isostr2date(md['SESSION_DATETIME'])
if not ses_data:
return start_time, None
c = 0
for sd in reversed(ses_data):
ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] -
sd['behavior_data']['Bpod start timestamp'])
if ses_duration_secs < (6 * 3600):
break
c += 1
if c:
_logger.warning((f'Trial end timestamps of last {c} trials above 6 hours '
f'(most likely corrupt): ') + str(fn))
end_time = start_time + datetime.timedelta(seconds=ses_duration_secs)
return start_time, end_time
def _get_session_performance(md, ses_data):
"""Get performance about the session from bpod data"""
if not ses_data:
return None, None
n_trials = ses_data[-1]['trial_num']
# checks that the number of actual trials and labeled number of trials check out
assert (len(ses_data) == n_trials)
# task specific logic
if 'habituationChoiceWorld' in md['PYBPOD_PROTOCOL']:
n_correct_trials = 0
else:
n_correct_trials = ses_data[-1]['ntrials_correct']
return n_trials, n_correct_trials
def _glob_session(ses_path):
"""
Glob for files to be registered on an IBL session
:param ses_path: pathlib.Path of the session
:return: a list of files to potentially be registered
"""
fl = []
for gp in REGISTRATION_GLOB_PATTERNS:
fl.extend(list(ses_path.glob(gp)))
return fl
[docs]def get_local_data_repository(one):
"""
Get local data repo name from globus client
:param one:
:return:
"""
if one is None:
return
if not Path.home().joinpath(".globusonline/lta/client-id.txt").exists():
return
with open(Path.home().joinpath(".globusonline/lta/client-id.txt"), 'r') as fid:
globus_id = fid.read()
data_repo = one.alyx.rest('data-repository', 'list', globus_endpoint_id=globus_id)
if len(data_repo):
return [da['name'] for da in data_repo][0]