import unittest
from functools import partial
from pathlib import Path
import numpy as np
from iblutil.util import Bunch
from one.api import ONE
from ibllib.tests import TEST_DB
from ibllib.qc import task_metrics as qcmetrics
from brainbox.behavior.wheel import cm_to_rad
[docs]class TestAggregateOutcome(unittest.TestCase):
[docs] def test_outcome_from_dict_default(self):
# For a task that has no costume thresholds, default is 0.99 PASS and 0.9 WARNING and 0 FAIL,
# np.nan and None return not set
qc_dict = {'gnap': .99, 'gnop': np.nan, 'gnip': None, 'gnep': 0.9, 'gnup': 0.89}
expect = {'gnap': 'PASS', 'gnop': 'NOT_SET', 'gnip': 'NOT_SET', 'gnep': 'WARNING', 'gnup': 'FAIL'}
outcome, outcome_dict = qcmetrics.TaskQC.compute_session_status_from_dict(qc_dict)
self.assertEqual(outcome, 'FAIL')
self.assertEqual(expect, outcome_dict)
[docs] def test_outcome_from_dict_stimFreeze_delays(self):
# For '_task_stimFreeze_delays' the threshold are 0.99 PASS and 0 WARNING
qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_stimFreeze_delays': .1}
expect = {'gnap': 'PASS', 'gnop': 'NOT_SET', '_task_stimFreeze_delays': 'WARNING'}
outcome, outcome_dict = qcmetrics.TaskQC.compute_session_status_from_dict(qc_dict)
self.assertEqual(outcome, 'WARNING')
self.assertEqual(expect, outcome_dict)
[docs] def test_outcome_from_dict_iti_delays(self):
# For '_task_iti_delays' the threshold is 0 NOT_SET
qc_dict = {'gnap': .99, 'gnop': np.nan, '_task_iti_delays': .1}
expect = {'gnap': 'PASS', 'gnop': 'NOT_SET', '_task_iti_delays': 'NOT_SET'}
outcome, outcome_dict = qcmetrics.TaskQC.compute_session_status_from_dict(qc_dict)
self.assertEqual(outcome, 'PASS')
self.assertEqual(expect, outcome_dict)
[docs] def test_out_of_bounds(self):
# When qc values are below 0 or above 1, give error
qc_dict = {'gnap': 1.01, 'gnop': 0, 'gnip': 0.99}
with self.assertRaises(ValueError) as e:
outcome, outcome_dict = qcmetrics.TaskQC.compute_session_status_from_dict(qc_dict)
self.assertTrue(e.exception.args[0] == 'Values out of bound')
[docs]class TestTaskMetrics(unittest.TestCase):
[docs] def setUp(self):
self.data = self.load_fake_bpod_data()
self.wheel_gain = 4
wheel_data = self.load_fake_wheel_data(self.data, wheel_gain=self.wheel_gain)
self.data.update(wheel_data)
[docs] @staticmethod
def load_fake_bpod_data(n=5):
"""Create fake extractor output of bpodqc.load_data
:param n: the number of trials
:return: a dict of simulated trial data
"""
trigg_delay = 1e-4 # an ideal delay between triggers and measured times
resp_feeback_delay = 1e-3 # delay between feedback and response
stimOff_itiIn_delay = 5e-3 # delay between stimOff and itiIn
N = partial(np.random.normal, (n,)) # Convenience function for norm dist sampling
choice = np.ones((n,), dtype=int)
choice[[1, 3]] = -1 # a couple of incorrect trials
choice[0] = 0 # a nogo trial
# One trial of each type incorrect
correct = choice != 0
correct[np.argmax(choice == 1)] = 0
correct[np.argmax(choice == -1)] = 0
quiescence_length = 0.2 + np.random.standard_exponential(size=(n,))
iti_length = 0.5 # inter-trial interval
# trial lengths include quiescence period, a couple small trigger delays and iti
trial_lengths = quiescence_length + resp_feeback_delay + (trigg_delay * 4) + iti_length
# add on 60s for nogos + feedback time (1 or 2s) + ~0.5s for other responses
trial_lengths += (choice == 0) * 60 + (~correct + 1) + (choice != 0) * N(0.5)
start_times = np.concatenate(([0], np.cumsum(trial_lengths)[:-1]))
end_times = np.cumsum(trial_lengths) - 1e-2
data = {
"phase": np.random.uniform(low=0, high=2 * np.pi, size=(n,)),
"quiescence": quiescence_length,
"choice": choice,
"correct": correct,
"intervals": np.c_[start_times, end_times],
"itiIn_times": end_times - iti_length + stimOff_itiIn_delay,
"position": np.ones_like(choice) * 35
}
data["stimOnTrigger_times"] = start_times + data["quiescence"] + 1e-4
data["stimOn_times"] = data["stimOnTrigger_times"] + 1e-1
data["goCueTrigger_times"] = data["stimOn_times"] + 1e-3
data["goCue_times"] = data["goCueTrigger_times"] + trigg_delay
data["response_times"] = end_times - (
resp_feeback_delay + iti_length + (~correct + 1)
)
data["feedback_times"] = data["response_times"] + resp_feeback_delay
data["stimFreeze_times"] = data["response_times"] + 1e-2
data["stimFreezeTrigger_times"] = data["stimFreeze_times"] - trigg_delay
data["feedbackType"] = np.vectorize(lambda x: -1 if x == 0 else x)(data["correct"])
outcome = data["feedbackType"].copy()
outcome[data["choice"] == 0] = 0
data["outcome"] = outcome
# Delay of 1 second if correct, 2 seconds if incorrect
data["stimOffTrigger_times"] = data["feedback_times"] + (~correct + 1)
data["stimOff_times"] = data["stimOffTrigger_times"] + trigg_delay
# Error tone times nan on incorrect trials
outcome_times = np.vectorize(lambda x, y: x + 1e-2 if y else np.nan)
data["errorCueTrigger_times"] = outcome_times(data["feedback_times"], ~data["correct"])
data["errorCue_times"] = data["errorCueTrigger_times"] + trigg_delay
data["valveOpen_times"] = outcome_times(data["feedback_times"], data["correct"])
data["rewardVolume"] = ~np.isnan(data["valveOpen_times"]) * 3.0
return data
[docs] @staticmethod
def load_fake_wheel_data(trial_data, wheel_gain=4):
# Load a wheel fragment: a numpy array of the form [timestamps, positions], for a wheel
# movement during one trial. Wheel is X1 bpod RE in radians.
wh_path = Path(__file__).parent.joinpath('..', 'fixtures', 'qc').resolve()
wheel_frag = np.load(wh_path.joinpath('wheel.npy'))
resolution = np.mean(np.abs(np.diff(wheel_frag[:, 1]))) # pos diff between samples
# abs displacement, s, in mm required to move 35 visual degrees
POS_THRESH = 35
s_mm = np.abs(POS_THRESH / wheel_gain) # don't care about direction
# convert abs displacement to radians (wheel pos is in rad)
pos_thresh = cm_to_rad(s_mm * 1e-1)
# index of threshold cross
pos_thresh_idx = np.argmax(np.abs(wheel_frag[:, 1]) > pos_thresh)
def qt_wheel_fill(start, end, t_step=0.001, p_step=None):
if p_step is None:
p_step = 2 * np.pi / 1024
t = np.arange(start, end, t_step)
p = np.random.randint(-1, 2, len(t))
t = t[p != 0]
p = p[p != 0].cumsum() * p_step
return t, p
wheel_data = [] # List generated of wheel data fragments
movement_times = [] # List of generated first movement times
def add_frag(t, p):
"""Add wheel data fragments to list, adjusting positions to be within one sample of
one another"""
last_samp = getattr(add_frag, 'last_samp', (0, 0))
p += last_samp[1]
if np.abs(p[0] - last_samp[1]) == 0:
p += resolution
wheel_data.append((t, p))
add_frag.last_samp = (t[-1], p[-1])
for i in np.arange(len(trial_data['choice'])):
# Iterate over trials generating wheel samples for the necessary periods
# trial start to stim on; should be below quiescence threshold
stimOn_trig = trial_data['stimOnTrigger_times'][i]
trial_start = trial_data['intervals'][i, 0]
t, p = qt_wheel_fill(trial_start, stimOn_trig, .5, resolution)
if len(t) > 0: # Possible for no movement during quiescence
add_frag(t, p)
# stim on to trial end
trial_end = trial_data['intervals'][i, 1]
if trial_data['choice'][i] == 0:
# Add random wheel movements for duration of trial
goCue = trial_data['goCue_times'][i]
t, p = qt_wheel_fill(goCue, trial_end, .1, resolution)
add_frag(t, p)
movement_times.append(t[0])
else:
# Align wheel fragment with response time
response_time = trial_data['response_times'][i]
t = wheel_frag[:, 0] + response_time - wheel_frag[pos_thresh_idx, 0]
p = np.abs(wheel_frag[:, 1]) * trial_data['choice'][i]
assert t[0] > add_frag.last_samp[0]
movement_times.append(t[1])
add_frag(t, p)
# Fill in random movements between end of response and trial end
t, p = qt_wheel_fill(t[-1] + 0.01, trial_end, p_step=resolution)
add_frag(t, p)
# Stitch wheel fragments and assert no skips
wheel_data = np.concatenate(list(map(np.column_stack, wheel_data)))
assert np.all(np.diff(wheel_data[:, 0]) > 0), "timestamps don't strictly increase"
np.testing.assert_allclose(np.abs(np.diff(wheel_data[:, 1])), resolution)
assert len(movement_times) == trial_data['intervals'].shape[0]
return {
'wheel_timestamps': wheel_data[:, 0],
'wheel_position': wheel_data[:, 1],
'firstMovement_times': np.array(movement_times)
}
[docs] def test_check_stimOn_goCue_delays(self):
metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data)
self.assertTrue(np.allclose(metric, 0.0011), "failed to return correct metric")
# Set incorrect timestamp (goCue occurs before stimOn)
self.data["goCue_times"][-1] = self.data["stimOn_times"][-1] - 1e-4
metric, passed = qcmetrics.check_stimOn_goCue_delays(self.data)
n = len(self.data["stimOn_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_response_feedback_delays(self):
metric, passed = qcmetrics.check_response_feedback_delays(self.data)
self.assertTrue(np.allclose(metric, 0.001), "failed to return correct metric")
# Set incorrect timestamp (feedback occurs before response)
self.data["feedback_times"][-1] = self.data["response_times"][-1] - 1e-4
metric, passed = qcmetrics.check_response_feedback_delays(self.data)
n = len(self.data["feedback_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_response_stimFreeze_delays(self):
metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-2), "failed to return correct metric")
# Set incorrect timestamp (stimFreeze occurs before response)
self.data["stimFreeze_times"][-1] = self.data["response_times"][-1] - 1e-4
metric, passed = qcmetrics.check_response_stimFreeze_delays(self.data)
n = len(self.data["feedback_times"]) - np.sum(self.data["choice"] == 0)
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_positive_feedback_stimOff_delays(self):
metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data)
self.assertTrue(
np.allclose(metric[self.data["correct"]], 1e-4), "failed to return correct metric"
)
# Set incorrect timestamp (stimOff occurs just after response)
id = np.argmax(self.data["correct"])
self.data["stimOff_times"][id] = self.data["response_times"][id] + 1e-2
metric, passed = qcmetrics.check_positive_feedback_stimOff_delays(self.data)
expected = (self.data["correct"].sum() - 1) / self.data["correct"].sum()
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_negative_feedback_stimOff_delays(self):
err_trial = ~self.data["correct"]
metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data)
values = np.abs(metric[err_trial])
self.assertTrue(np.allclose(values, 1e-2), "failed to return correct metric")
# Set incorrect timestamp (stimOff occurs 1s after response)
id = np.argmax(err_trial)
self.data["stimOff_times"][id] = self.data["response_times"][id] + 1
metric, passed = qcmetrics.check_negative_feedback_stimOff_delays(self.data)
expected = (err_trial.sum() - 1) / err_trial.sum()
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_error_trial_event_sequence(self):
metric, passed = qcmetrics.check_error_trial_event_sequence(self.data)
self.assertTrue(np.all(metric == ~self.data['correct']), "failed to return correct metric")
self.assertTrue(np.all(passed))
# Set incorrect timestamp (itiIn occurs before errorCue)
err_trial = ~self.data["correct"]
(id,) = np.where(err_trial)
self.data["intervals"][id[0], 0] = np.inf
self.data["errorCue_times"][id[1]] = 0
metric, passed = qcmetrics.check_error_trial_event_sequence(self.data)
expected = (err_trial.sum() - 2) / err_trial.sum()
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_correct_trial_event_sequence(self):
metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data)
self.assertTrue(np.all(metric == self.data['correct']), "failed to return correct metric")
self.assertTrue(np.all(passed))
# Set incorrect timestamp
correct = self.data["correct"]
id = np.argmax(correct)
self.data["intervals"][id, 0] = np.inf
metric, passed = qcmetrics.check_correct_trial_event_sequence(self.data)
expected = (correct.sum() - 1) / correct.sum()
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_trial_length(self):
metric, passed = qcmetrics.check_trial_length(self.data)
self.assertTrue(np.all(metric), "failed to return correct metric")
# Set incorrect timestamp
self.data["goCue_times"][-1] = 0
metric, passed = qcmetrics.check_trial_length(self.data)
n = len(self.data["goCue_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_goCue_delays(self):
metric, passed = qcmetrics.check_goCue_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), "failed to return correct metric")
# Set incorrect timestamp
self.data["goCue_times"][1] = self.data["goCueTrigger_times"][1] + 0.1
metric, passed = qcmetrics.check_goCue_delays(self.data)
n = len(self.data["goCue_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_errorCue_delays(self):
metric, passed = qcmetrics.check_errorCue_delays(self.data)
err_trial = ~self.data["correct"]
self.assertTrue(np.allclose(metric[err_trial], 1e-4), "failed to return correct metric")
# Set incorrect timestamp
id = np.argmax(err_trial)
self.data["errorCue_times"][id] = self.data["errorCueTrigger_times"][id] + 0.1
metric, passed = qcmetrics.check_errorCue_delays(self.data)
n = err_trial.sum()
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_stimOn_delays(self):
metric, passed = qcmetrics.check_stimOn_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-1), "failed to return correct metric")
# Set incorrect timestamp
self.data["stimOn_times"][-1] = self.data["stimOnTrigger_times"][-1] + 0.2
metric, passed = qcmetrics.check_stimOn_delays(self.data)
n = len(self.data["stimOn_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_stimOff_delays(self):
metric, passed = qcmetrics.check_stimOff_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), "failed to return correct metric")
# Set incorrect timestamp
self.data["stimOff_times"][-1] = self.data["stimOffTrigger_times"][-1] + 0.2
metric, passed = qcmetrics.check_stimOff_delays(self.data)
n = len(self.data["stimOff_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_stimFreeze_delays(self):
metric, passed = qcmetrics.check_stimFreeze_delays(self.data)
self.assertTrue(np.allclose(metric, 1e-4), "failed to return correct metric")
# Set incorrect timestamp
self.data["stimFreeze_times"][-1] = self.data["stimFreezeTrigger_times"][-1] + 0.2
metric, passed = qcmetrics.check_stimFreeze_delays(self.data)
n = len(self.data["stimFreeze_times"])
expected = (n - 1) / n
self.assertEqual(np.nanmean(passed), expected, "failed to detect dodgy timestamp")
[docs] def test_check_reward_volumes(self):
metric, passed = qcmetrics.check_reward_volumes(self.data)
self.assertTrue(all(x in {0.0, 3.0} for x in metric), "failed to return correct metric")
self.assertTrue(np.all(passed))
# Set incorrect volume
id = np.array([np.argmax(self.data["correct"]), np.argmax(~self.data["correct"])])
self.data["rewardVolume"][id] = self.data["rewardVolume"][id] + 1
metric, passed = qcmetrics.check_reward_volumes(self.data)
self.assertTrue(np.mean(passed) == 0.6, "failed to detect incorrect reward volumes")
[docs] def test_check_reward_volume_set(self):
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertTrue(all(x in {0.0, 3.0} for x in metric), "failed to return correct metric")
self.assertTrue(passed)
# Add a new volume to the set
id = np.argmax(self.data["correct"])
self.data["rewardVolume"][id] = 2.3
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertFalse(passed, "failed to detect incorrect reward volume set")
# Set 0 volumes to new value; set length still 2 but should fail anyway
self.data["rewardVolume"][~self.data["correct"]] = 2.3
metric, passed = qcmetrics.check_reward_volume_set(self.data)
self.assertFalse(passed, "failed to detect incorrect reward volume set")
[docs] def test_check_audio_pre_trial(self):
# Create Sound sync fake data that is OK
BNC2_OK = {
"times": self.data["goCue_times"] + 1e-1,
"polarities": np.array([1, -1, 1, -1, 1]),
}
# Create Sound sync fake data that is NOT OK
BNC2_NOK = {
"times": self.data["goCue_times"] - 1e-1,
"polarities": np.array([1, -1, 1, -1, 1]),
}
metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_OK)
self.assertTrue(~np.all(metric))
self.assertTrue(np.all(passed))
metric, passed = qcmetrics.check_audio_pre_trial(self.data, audio=BNC2_NOK)
self.assertTrue(np.all(metric))
self.assertTrue(~np.all(passed))
[docs] def test_check_wheel_freeze_during_quiescence(self):
metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data)
self.assertTrue(np.all(passed))
# Make one trial move more
n = 1 # Index of trial to manipulate
t1 = self.data['intervals'][n, 0]
t2 = self.data['stimOnTrigger_times'][n]
ts, pos = (self.data['wheel_timestamps'], self.data['wheel_position'])
wh_idx = np.argmax(ts > t1)
if ts[wh_idx] > self.data['stimOnTrigger_times'][n]:
# No sample during quiescence; insert one
self.data['wheel_timestamps'] = np.insert(ts, wh_idx, t2 - .001)
self.data['wheel_position'] = np.insert(pos, wh_idx, np.inf)
else: # Otherwise make one sample infinite
self.data['wheel_position'][wh_idx] = np.inf
metric, passed = qcmetrics.check_wheel_freeze_during_quiescence(self.data)
self.assertFalse(passed[n])
self.assertTrue(metric[n] > 2)
[docs] def test_check_wheel_move_before_feedback(self):
metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data)
nogo = self.data['choice'] == 0
self.assertTrue(np.all(passed[~nogo]))
self.assertTrue(np.isnan(metric[nogo]).all())
self.assertTrue(np.isnan(passed[nogo]).all())
# Remove wheel data around feedback for choice trial
assert self.data['choice'].any(), 'no choice trials in test data'
n = np.argmax(self.data['choice'] != 0) # Index of choice trial
mask = np.logical_xor(self.data['wheel_timestamps'] > self.data['feedback_times'][n] - 1,
self.data['wheel_timestamps'] < self.data['feedback_times'][n] + 1)
self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask]
self.data['wheel_position'] = self.data['wheel_position'][mask]
metric, passed = qcmetrics.check_wheel_move_before_feedback(self.data)
self.assertFalse(passed[n] or metric[n] != 0)
[docs] def test_check_wheel_move_during_closed_loop(self):
gain = self.wheel_gain or 4
metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain)
nogo = self.data['choice'] == 0
self.assertTrue(np.all(passed[~nogo]))
self.assertTrue(np.isnan(metric[nogo]).all())
self.assertTrue(np.isnan(passed[nogo]).all())
# Remove wheel data for choice trial
assert self.data['choice'].any(), 'no choice trials in test data'
n = np.argmax(self.data['choice'] != 0) # Index of choice trial
mask = np.logical_xor(self.data['wheel_timestamps'] < self.data['goCue_times'][n],
self.data['wheel_timestamps'] > self.data['response_times'][n])
self.data['wheel_timestamps'] = self.data['wheel_timestamps'][mask]
self.data['wheel_position'] = self.data['wheel_position'][mask]
metric, passed = qcmetrics.check_wheel_move_during_closed_loop(self.data, gain)
self.assertFalse(passed[n])
[docs] def test_check_wheel_integrity(self):
metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1')
self.assertTrue(np.all(passed))
# Insert some violations and verify that they're caught
idx = np.random.randint(self.data['wheel_timestamps'].size, size=2)
self.data['wheel_timestamps'][idx[0] + 1] -= 1
self.data['wheel_position'][idx[1]] -= 1
metric, passed = qcmetrics.check_wheel_integrity(self.data, re_encoding='X1')
self.assertFalse(passed[idx].any())
[docs] def test_check_n_trial_events(self):
metric, passed = qcmetrics.check_n_trial_events(self.data)
self.assertTrue(np.all(passed == 1.) and np.all(metric))
# Change errorCueTriggers
id = np.argmax(self.data['correct'])
self.data['errorCueTrigger_times'][id] = self.data['intervals'][id, 0] + np.random.rand()
_, passed = qcmetrics.check_n_trial_events(self.data)
self.assertFalse(passed[id])
# Change another event
id = id - 1 if id > 0 else id + 1
self.data['goCue_times'][id] = self.data['intervals'][id, 1] + np.random.rand()
_, passed = qcmetrics.check_n_trial_events(self.data)
self.assertFalse(passed[id])
[docs] def test_check_detected_wheel_moves(self):
metric, passed = qcmetrics.check_detected_wheel_moves(self.data)
self.assertTrue(np.all(self.data['firstMovement_times'] == metric))
self.assertTrue(np.all(passed))
# Change a movement time
id = np.argmax(self.data['choice'] != 0)
self.data['firstMovement_times'][id] = self.data['goCue_times'][id] - 0.3
_, passed = qcmetrics.check_detected_wheel_moves(self.data)
self.assertEqual(0.75, np.nanmean(passed))
# Change the min_qt
_, passed = qcmetrics.check_detected_wheel_moves(self.data, min_qt=0.3)
self.assertTrue(np.all(passed))
[docs] @unittest.skip("not implemented")
def test_check_stimulus_move_before_goCue(self):
pass # TODO Nicco?
[docs] def test_check_stimOff_itiIn_delays(self):
metric, passed = qcmetrics.check_stimOff_itiIn_delays(self.data)
self.assertTrue(np.nanmean(passed))
# No go should be NaN
id = np.argmax(self.data['choice'] == 0)
self.assertTrue(np.isnan(passed[id]), 'No go trials should be excluded')
# Change a trial
id = np.argmax(self.data['choice'] != 0)
self.data['stimOff_times'][id] = self.data['itiIn_times'][id] + 1e-4
_, passed = qcmetrics.check_stimOff_itiIn_delays(self.data) # recompute
self.assertEqual(0.75, np.nanmean(passed))
[docs] def test_check_iti_delays(self):
metric, passed = qcmetrics.check_iti_delays(self.data)
# We want the metric to return positive values that are close to 0.1, given the test data
self.assertTrue(np.allclose(metric[:-1], 1e-2, atol=0.001),
"failed to return correct metric")
self.assertTrue(np.isnan(metric[-1]), "last trial should be NaN")
self.assertTrue(np.all(passed))
# Mess up a trial
id = 2
self.data["intervals"][id + 1, 0] += 0.5 # Next trial starts 0.5 sec later
metric, passed = qcmetrics.check_iti_delays(self.data)
n_trials = len(self.data["stimOff_times"]) - 1 # Last trial NaN here
expected = (n_trials - 1) / n_trials
self.assertTrue(expected, np.nanmean(passed))
[docs] @unittest.skip("not implemented")
def test_check_frame_frequency(self):
pass # TODO Miles
[docs] @unittest.skip("not implemented")
def test_check_frame_updates(self):
pass # TODO Nicco?
[docs]class TestHabituationQC(unittest.TestCase):
"""Test HabituationQC class
NB: For complete coverage this should be run along slide the integration tests
"""
[docs] def setUp(self):
eid = '8dd0fcb0-1151-4c97-ae35-2e2421695ad7'
one = ONE(**TEST_DB)
self.qc = qcmetrics.HabituationQC(eid, one=one)
self.qc.extractor = Bunch({'data': self.load_fake_bpod_data()}) # Dummy extractor obj
[docs] @staticmethod
def load_fake_bpod_data(n=5):
"""Create fake extractor output of bpodqc.load_data
:param n: the number of trials
:return: a dict of simulated trial data
"""
trigg_delay = 1e-4 # an ideal delay between triggers and measured times
iti_length = 0.5 # the so-called 'inter-trial interval'
blank_length = 1. # the time between trial start and stim on
stimCenter_length = 1. # the length of time the stimulus is in the center
# the lengths of time between stim on and stim center
stimOn_length = np.random.normal(size=(n,)) + 10
# trial lengths include couple small trigger delays and iti
trial_lengths = blank_length + stimOn_length + 1e-1 + stimCenter_length
start_times = np.concatenate(([0], np.cumsum(trial_lengths)[:-1]))
end_times = np.cumsum(trial_lengths) - 1e-2
data = {
"phase": np.random.uniform(low=0, high=2 * np.pi, size=(n,)),
"stimOnTrigger_times": start_times + blank_length,
"intervals": np.c_[start_times, end_times],
"itiIn_times": end_times - iti_length,
"position": np.random.choice([-1, 1], n, replace=True) * 35,
"feedbackType": np.ones(n),
"feedback_times": end_times - 0.5,
"rewardVolume": np.ones(n) * 3.,
"stimOff_times": end_times + trigg_delay,
"stimOffTrigger_times": end_times
}
data["stimOn_times"] = data["stimOnTrigger_times"] + trigg_delay
data["goCueTrigger_times"] = data["stimOnTrigger_times"]
data["goCue_times"] = data["goCueTrigger_times"] + trigg_delay
data["stimCenter_times"] = data["feedback_times"] - 0.5
data["stimCenterTrigger_times"] = data["stimCenter_times"] - trigg_delay
data["valveOpen_times"] = data["feedback_times"]
return data
[docs] def test_compute(self):
# All should pass except one NOT_SET
self.qc.compute()
self.assertIsNotNone(self.qc.metrics)
_, _, outcomes = self.qc.compute_session_status()
if self.qc.passed['_task_habituation_time'] is None:
self.assertEqual(outcomes['_task_habituation_time'], 'NOT_SET')
if __name__ == "__main__":
unittest.main(exit=False, verbosity=2)