import json
import numpy as np
from datetime import datetime, timedelta
import utils as utl # type: ignore
import re
[docs]def info_electrodes(data,version='Initial',side=0):
'''
Return information of the active electrodes from TIMELINE mode
side 0 - Left;
side 1 - Right
:param Data: dict, from DataHandling.getData(), reorganized using DataHandling.getData()
:param version: str, default to 'Initial'. Access to version configuration
:param side: int, default to 0 ('Left')
:return: dict
'''
GroupHistory=data['Device']['Groups']
di = GroupHistory[version][0]
# di = data['Device']['GroupHistory'][0]
Group_id=di["GroupId"]
ProgramSettings=di["ProgramSettings"]
GroupSettings = di["GroupSettings"]
try:
Rate_in_hertz=ProgramSettings["RateInHertz"]
except KeyError as e:
if e.args[0] == "RateInHertz":
if isinstance(ProgramSettings['SensingChannel'],dict):
Rate_in_hertz=ProgramSettings['SensingChannel']["RateInHertz"]
elif isinstance(ProgramSettings['SensingChannel'],list):
Rate_in_hertz=0
else:
return None
try:
SensingChannel=ProgramSettings['SensingChannel']
Sensing_electrodes=SensingChannel[side]["Channel"]
Sensing_electrodes=Sensing_electrodes.split('.')[-1]
Suspend_amplitude_in_miliamps = SensingChannel[side]['SuspendAmplitudeInMilliAmps']
Sensing_freq = SensingChannel[side]['SensingSetup']['FrequencyInHertz']
# ChannelPSD = SensingChannel[side]['SensingSetup']['ChannelSignalResult']
except Exception as e: #when electrodes dont allow sensing
SensingChannel = ProgramSettings['LeftHemisphere']['Programs']
Suspend_amplitude_in_miliamps=SensingChannel[0]["AmplitudeInMilliAmps"]
Sensing_electrodes = 'Sensing Disabled'
Sensing_freq = ''
Pulse_width_in_microseconds=SensingChannel[side]["PulseWidthInMicroSecond"]
left_e=SensingChannel[side]["ElectrodeState"]
right_e=SensingChannel[side]["ElectrodeState"]
Left,Right=[], []
for d in left_e:
Left.append(re.split(r'\.|_', d['Electrode'])[-1] + '-' + d["ElectrodeStateResult"].split('.')[-1])
# for d in right_e:
# Right.append(re.split('\.|_',d['Electrode'])[-1]+'-'+d["ElectrodeStateResult"].split('.')[-1])
type_stimulation = GroupSettings['Cycling']['Enabled']
if type_stimulation == False:
type_stimulation = 'Continuous'
else: type_stimulation = 'Cycling'
a = {
'Group_id': utl.after_point(Group_id),
'Frequency': Rate_in_hertz,
'Pulse': Pulse_width_in_microseconds,
'Amplitude': Suspend_amplitude_in_miliamps,
'Stim_electrodes': Left,
'Type': type_stimulation,
}
if not isinstance(Sensing_freq,str):
for g in range(4):
try:
ChannelPSD = data['Device']['GroupHistory'][0]['Groups'][g]['ProgramSettings']['SensingChannel'][side]['SensingSetup']['ChannelSignalResult']
break
except Exception as e: continue
date = utl.parse_datetime(data['Device']['GroupHistory'][0]['SessionDate']).date()
if ChannelPSD == {}:
ChannelPSD = SensingChannel[side]['SensingSetup']['ChannelSignalResult']
date = 'Most Recent Signal'
temp = {
'PSD': ChannelPSD,
'Sense_electrodes': Sensing_electrodes,
'Sense_frequency': Sensing_freq,
'PSDdate': date
}
a = a | temp
return a
[docs]def add_utc_conversion_to_dates(data, offset):
'''
Recursively convert all 'Date'-related fields to UTC with offset (time-shifted).
Assumes naive datetimes are in the specified local timezone.
:param Data: dict, from DataHandling.getData(), from DataHandling.getData()
:param offset:
'''
if isinstance(data, dict):
for key, value in data.items():
if 'DateTime' in key and isinstance(value, str):
try:
new = utl.full_date2str(utl.parse_datetime(data[key]) - offset)
data[key] = new
except Exception:
pass
elif isinstance(value, (dict, list)):
add_utc_conversion_to_dates(value, offset)
elif isinstance(data, list):
for item in data:
add_utc_conversion_to_dates(item, offset)
return data
[docs]def getData(file_path):
'''
Creates organized dictionary with information in JSON file. Dictionary has 7 main keys, organized by the recording modes and device information.
Data = {
'Device': all information relative to device, stimulation, programming, session metadata, battery and electrode status,
'Survey': recording data from BrainSense Survey (TD and PSD),
'Setup': recording data from BrainSense Setup (Stimulation ON/OFF and PSD artifact checking),
'Streaming': recording data from BrainSense Streaming (Stimulation ON/OFF),
'Indefinite': recording data from BrainSense Indefinite Streaming,
'Events': at-home marked events, recorded 30s PSD,
'Timeline': at-home LFP recordings,
}
:param file_path: str, path to JSON file.
:return: dict
'''
file = open(file_path,'r',encoding='utf-8')
try:
raw = json.load(file)
except Exception:
return 'JSON corrupted.'
offset = extract_time_offset(raw)
raw_data = add_utc_conversion_to_dates(raw,offset)
device_keys = ['AbnormalEnd', 'FullyReadForSession',
'FeatureInformationCode', 'SessionDate',
'SessionEndDate', 'ProgrammerTimezone',
'ProgrammerUtcOffset', 'ProgrammerLocale',
'ProgrammerVersion', 'PatientInformation',
'DeviceInformation', 'BatteryInformation',
'RechargeCount', 'NonRechargeBatteryStatus',
'GroupUsagePercentage', 'LeadConfiguration',
'Stimulation', 'Groups',
'BatteryReminder', 'Impedance',
'GroupHistory', 'PatientEvents']
setup_keys = ['SenseChannelTests', 'CalibrationTests', 'MostRecentInSessionSignalCheck'] #
survey_keys = ['LFPMontage', 'LfpMontageTimeDomain']
streaming_keys = ['BrainSenseTimeDomain', 'BrainSenseLfp']
indefinite_keys = ['IndefiniteStreaming']
timeline_keys= ['LFPTrendLogs']
Device = {key: raw_data[key] for key in device_keys if key in raw_data} #dict com info só DEVICE
Setup = {key: raw_data[key] for key in setup_keys if key in raw_data} #dict com info só SETUP
Survey = {key: raw_data[key] for key in survey_keys if key in raw_data} #dict com info só SURVEY
Streaming = {key: raw_data[key] for key in streaming_keys if key in raw_data} #dict com info só STREAMING
Indefinite = {key: raw_data[key] for key in indefinite_keys if key in raw_data} #dict com info só INDEFINITE
#Timeline = {key: raw_data[key] for key in timeline_keys if key in raw_data} #dict com info só TIMELINE - chronic
#dict com info só EVENTOS
event_diagnotic_keys = ['EventLogs','LfpFrequencySnapshotEvents']
event_keys = ['EventSummary']
Events = {}
Timeline = {}
if 'DiagnosticData' in raw_data:
for key in event_diagnotic_keys:
if key in raw_data['DiagnosticData']:
Events[key] = raw_data['DiagnosticData'][key]
for key in timeline_keys:
if key in raw_data['DiagnosticData']:
Timeline[key] = raw_data['DiagnosticData'][key]
for key in event_keys:
if key in raw_data:
Events[key] = raw_data[key]
Data = {
'Device': Device,
'Survey': Survey,
'Setup': Setup,
'Streaming': Streaming,
'Indefinite': Indefinite,
'Events': Events,
'Timeline': Timeline
}
return Data
[docs]def getModes(Data):
'''
Gets recording modes present in Data dictionary. Verifies if Data is empty or damaged (with recordings available).
Modes:
- Survey
- Setup
- Indefinite (Streaming)
- Streaming
- Timeline
- Events
:param Data: dict, from DataHandling.getData()
:return: list
'''
past = getLastSessionDate(Data)
def add_mode(Data):
for key, value in Data.items():
if key == 'Events':
if 'LfpFrequencySnapshotEvents' not in Data[key].keys(): #sensing capacities not yet setup
continue
# elif not isinstance (Data[key]['LfpFrequencySnapshotEvents'], dict): #sensing capacities off
# continue
if key == 'Timeline':
if 'LFPTrendLogs' not in Data['Timeline'].keys(): continue
first = list(Data['Timeline']['LFPTrendLogs']['HemisphereLocationDef.Left'].keys())[0]
last_time = Data['Timeline']['LFPTrendLogs']['HemisphereLocationDef.Left'][first][0]['DateTime']
if past>last_time: continue
if key != 'Device' and value: # Ensure it's not empty
if (
key == 'Setup'
and set(value.keys()) == {'MostRecentInSessionSignalCheck'} # Ensure it's the only key
and (not value['MostRecentInSessionSignalCheck'] or isinstance(value['MostRecentInSessionSignalCheck'], list) and len(value['MostRecentInSessionSignalCheck']) == 0)
):
continue # Skip appending 'Setup'
modes.append(key)
modes = []
if 'Device' not in Data or Data['Device'].get('AbnormalEnd') is False: # Check if recording was successfully terminated
add_mode(Data)
else:
if 'Device' in Data.keys():
Warning(f"Damaged with recordings!")
modes.append('Damaged')
add_mode(Data)
if modes == ['Damaged']:
modes = []
modes.append('None. Recording is Damaged!')
modes = sorted(modes)
return modes
[docs]def getStimStatus(Data):
'''
Gets status (ON/OFF) of stimulation, from beginning and end of recording session.
:param Data: dict, from DataHandling.getData()
:return: tuple, (initial,final)
'''
initial = utl.after_point(Data['Device']['Stimulation']['InitialStimStatus'])
final = utl.after_point(Data['Device']['Stimulation']['FinalStimStatus'])
return (initial,final)
[docs]def getSessionDate(Data):
'''
Returns string of session date.
:param Data: dict, from DataHandling.getData()
'''
if Data['Device']['SessionDate']:
return utl.parse_datetime(Data['Device']['SessionDate'])
else: return "No date stored!"
[docs]def getSessionDuration(Data):
'''
Gets session duration.
:param Data: dict, from DataHandling.getData()
:return: int
'''
if Data['Device']['SessionEndDate'] and Data['Device']['SessionDate']:
end = utl.parse_datetime(Data['Device']['SessionEndDate'])
beginning = utl.parse_datetime(Data['Device']['SessionDate'])
return abs(end - beginning)
else: return "No dates stored!"
[docs]def getPatientInfo(Data):
'''
Returns patient information.
:param Data: dict, from DataHandling.getData().
'''
return Data['Device']['PatientInformation']['Initial']
[docs]def getDeviceInfo(Data):
'''
Returns device information.
:param Data: dict, from DataHandling.getData()
'''
return Data['Device']['DeviceInformation']['Final']
[docs]def getLastSessionDate(Data):
'''
Returns date of previous session.
:param Data: dict, from DataHandling.getData()
'''
if 'GroupHistory' not in Data['Device'].keys():
return 'This is First Session.'
return Data['Device']['GroupHistory'][1]['SessionDate']
[docs]def getBatteryInfo(Data):
'''
Returns battery information.
:param Data: dict, from DataHandling.getData()
'''
return Data['Device']['BatteryInformation']
[docs]def getRecordingDuration(mode, recording):
'''
Returns recording duration.
:param mode: str
:param recording: dict
:return: float
'''
match mode:
case 'Streaming':
Ticks = utl.string2numbers(recording['TicksInMses'])
temp_time = (Ticks[0] - Ticks[-1])/1000
case 'Timeline':
days = recording.keys()
for day in days:
TickI = utl.parse_datetime(recording[day][-1]['DateTime']).timestamp()
TickF = utl.parse_datetime(recording[day][0]['DateTime']).timestamp()
temp_time = (TickF - TickI)/1000
else: pass
case 'Indefinite':
Ticks = utl.string2numbers(recording['TicksInMses'])
Fs = recording['SampleRateInHz']
if len(Ticks)==0:
GlobalPacketSizes = utl.string2numbers(recording['GlobalPacketSizes'][1:-1])
temp_time = sum(GlobalPacketSizes)/Fs
else:
Ticks = (Ticks - Ticks[0])/1000
temp_time = Ticks[-1]
case _: #Survey - SenseChannel OR CalibrationTest
if 'TicksInMses' in recording.keys():
Ticks = utl.string2numbers(recording['TicksInMses'])
Fs = recording['SampleRateInHz']
if len(Ticks)==0:
GlobalPacketSizes = utl.string2numbers(recording['GlobalPacketSizes'][1:-1])
temp_time = sum(GlobalPacketSizes)/Fs
else:
Ticks = (Ticks - Ticks[0])/1000
temp_time = Ticks[-1]
return np.abs(np.round(temp_time,4))
[docs]def getEventsElectrodes(data):
'''
Returns active electrodes at beginning and end, for at-home recordings.
:param Data: dict, from DataHandling.getData()
:return: list, [initial, final]
'''
if ('Events' or 'Timeline') not in getModes(data):
return None
if 'SensingChannel' not in data['Device']['Groups']['Initial'][0]['ProgramSettings'].keys():
initial = 'At-home sensing disabled!'
else:
initial = data['Device']['Groups']['Initial'][0]['ProgramSettings']['SensingChannel']
left = utl.after_point(initial[0]['Channel'])
right = utl.after_point(initial[1]['Channel'])
initial = left, right
if 'SensingChannel' not in data['Device']['Groups']['Final'][0]['ProgramSettings'].keys():
final = 'At-home sensing disabled!'
else:
final = data['Device']['Groups']['Final'][0]['ProgramSettings']['SensingChannel']
left = utl.after_point(final[0]['Channel'])
right = utl.after_point(final[1]['Channel'])
final = left, right
return initial, final
[docs]def getEventLog(Data):
'''
Returns log of marked events.
:param Data: dict, from DataHandling.getData()
'''
data = Data['Events']['LfpFrequencySnapshotEvents']
return [event for event in data if 'LfpFrequencySnapshotEvents' not in event.keys()]
[docs]def getModeSignals(Data, mode):
'''
Returns signals of respective mode, from Data dictionary.
:param Data: dict, from DataHandling.getData()
:param mode: str
:return: list
'''
previous_session = utl.parse_datetime(getLastSessionDate(Data))
match mode:
case 'Streaming': #✅
recs = Data[mode]['BrainSenseTimeDomain']
stim = Data[mode]['BrainSenseLfp']
case 'Timeline': #✅dont need to check date, only hemisphere
recs = Data[mode]['LFPTrendLogs']
case 'Events': #✅
recs = []
temp = Data[mode]['LfpFrequencySnapshotEvents']
paths = utl.find_key_path(temp,'LfpFrequencySnapshotEvents') #events snapshots have this key 2x, rest is only event timestamp
for path in paths:
path = utl.parse_path(path)
event = utl.access_by_path(temp,path)
if previous_session>utl.parse_datetime(event['DateTime']): continue
recs.append(event)
'''
Usage:
recs = DH.getModeSignals(Data,'Events')
for rec in recs: print(rec['EventName'],':',rec['LfpFrequencySnapshotEvents']['HemisphereLocationDef.Right']['DateTime'])
'''
case 'Indefinite': #✅
recs = Data[mode]['IndefiniteStreaming']
case 'Setup':
recs = [Data[mode]['SenseChannelTests'],
Data[mode]['CalibrationTests'],
Data[mode]['MostRecentInSessionSignalCheck']]
case 'Survey': #✅ #só falta o LFPMontage
recs = Data[mode]['LfpMontageTimeDomain']
return recs
[docs]def getSignals(Data, mode):
'''
Returns signals in respective mode, properly handled into a clean dictionary.
dict = {
'Date': utl.parse_datetime(rec['FirstPacketDateTime']),
'Channel': rec['Channel'],
'Y' : raw,
'X': time_array
}
:param Data: dict, from DataHandling.getData()
:param mode: str
:return: list, list of signal's dictionaries.
'''
recs = getModeSignals(Data, mode)
signals = []
def get(rec):
Fs = rec['SampleRateInHz']
total_duration = getRecordingDuration(mode,rec)
time_array = np.arange(0,total_duration,1/Fs) #in seconds
raw = rec['TimeDomainData']
if len(time_array)<len(raw): #while there is no correct missing data function complete
raw = raw[:len(time_array)]
else:
time_array = time_array[:len(raw)]
dict = {
'Date': utl.parse_datetime(rec['FirstPacketDateTime']),
'Channel': rec['Channel'],
'Y' : raw,
'X': time_array
}
return dict
if mode=='Setup':
#Stimulation OFF = Sense
#Stimulation ON = Calibration
sense = []
calibration = []
artifacts = []
for rec in recs[0]: sense.append(get(rec))
for rec in recs[1]: calibration.append(get(rec))
for rec in recs[2]:
dict = {
'Channel': utl.after_point(rec['Channel']),
'Artifact': utl.after_point(rec['ArtifactStatus']),
'X': rec['SignalFrequencies'],
'Y': rec['SignalPsdValues']
}
artifacts.append(dict)
signals = sense, calibration, artifacts
if mode in ['Streaming','Indefinite','Survey']:
for rec in recs: signals.append(get(rec))
elif mode=='Events':
signals = []
# if (recs is None) or (recs== []): return None
# if 'HemisphereLocationDef.Left' not in recs[0]: return None
for rec in recs:
left_side = rec['LfpFrequencySnapshotEvents']['HemisphereLocationDef.Left']
right_side = rec['LfpFrequencySnapshotEvents']['HemisphereLocationDef.Right']
left = {
'Channel': left_side['SenseID'],
'X': left_side['Frequency'],
'Y': left_side['FFTBinData']
}
right = {
'Channel': right_side['SenseID'],
'X': right_side['Frequency'],
'Y': right_side['FFTBinData']
}
dict = {
'EventName': rec['EventName'],
'EventID': rec['EventID'],
'DateInitial': rec['DateTime'],
'DateFinal': rec['LfpFrequencySnapshotEvents']['HemisphereLocationDef.Left']['DateTime'],
'Left': left,
'Right': right
}
signals.append(dict)
if recs == [] and 'LfpFrequencySnapshotEvents' in Data[mode].keys():
signals = {'Marked': Data[mode]['LfpFrequencySnapshotEvents']}
elif mode=='Timeline': #falta adicionar canal
signal = {}
for side in recs:
for day, logs in recs[side].items():
day = str(utl.parse_datetime(day).date())
if day not in signal:
signal[day] = {'Left': {'X': [], 'Y': [], 'Stim': []}, 'Right': {'X': [], 'Y': [], 'Stim': []}}
LFP = [log['LFP'] for log in logs[::-1]]
#time = [utl.convert_to_timestamp(log['DateTime']) for log in logs]
time = [log['DateTime'] for log in logs[::-1]]
stim = [log['AmplitudeInMilliAmps'] for log in logs[::-1]]
hemisphere = 'Left' if utl.after_point(side) == 'Left' else 'Right'
for idx,t in enumerate(time):
current_day = str(utl.parse_datetime(t).date())
if current_day not in signal:
signal[current_day] = {'Left': {'X': [], 'Y': [], 'Stim': []}, 'Right': {'X': [], 'Y': [], 'Stim': []}}
signal[current_day][hemisphere]['X'].append(t)
signal[current_day][hemisphere]['Y'].append(LFP[idx])
signal[current_day][hemisphere]['Stim'].append(stim[idx])
signals = [{'Date': day, 'Left': data['Left'], 'Right': data['Right']} for day, data in signal.items()]
return signals
[docs]def getStreamingStimulationValues(data):
'''
Returns streaming stimulation amplitude.
:param Data: dict, from DataHandling.getData()
:return: str (status), dict (right amplitude), dict (left amplitude)
'''
if 'Streaming' not in data.keys():
return
stimulation = data['Streaming']['BrainSenseLfp']
right, left, time = {}, {}, {}
for s in stimulation:
date = str(utl.parse_datetime(s['FirstPacketDateTime']))
right[date], left[date], time[date] = [], [], []
for side in s['LfpData']:
time[date].append(side['TicksInMs'])
right[date].append(side['Right']['mA'])
left[date].append(side['Left']['mA'])
for date, timeline in time.items():
new_timeline = [(t - timeline[0])/1000 for t in timeline]#from ms to s, starting at 0
time[date] = new_timeline
return right, left, time
[docs]def getStreamingStimulation(Data,date):
'''
Returns active stimulation group.
:param Data: dict, from DataHandling.getData()
:param date: str
:return: str
'''
dictionary = Data['Streaming']['BrainSenseLfp']
group = None
date = utl.extract_date(date)
for rec in dictionary:
rec_date = utl.extract_date(utl.parse_datetime(rec['FirstPacketDateTime']))
if rec_date == date:
group = rec['TherapySnapshot']['ActiveGroup']
group = utl.after_underscore(utl.after_point(group))
if group == None: return ['Stimulation OFF']
else:
right, left, time = getStreamingStimulationValues(Data)
return [f'Stimulation ON: Group {group}',right,left, time]
[docs]def getStimulation(Data,side='Right'):
'''
Returns at-home stimulation amplitude.
:param Data: dict, from DataHandling.getData()
:return: dict
'''
timeline = getModeSignals(Data,'Timeline')[f'HemisphereLocationDef.{side}']
info = info_electrodes(Data,version='Final')
electrodes = info['Stim_electrodes']
stimulation = {'X': [], 'Y': [], 'Info': []}
final_date = getSessionDate(Data)
for _, stream in timeline.items():
x = list(reversed([s['DateTime'] for s in stream]))
y = list(reversed([s['AmplitudeInMilliAmps'] for s in stream]))
for xx in x: stimulation['X'].append(xx)
for yy in y: stimulation['Y'].append(yy)
# text = f"{_[:10]} | Initial: {y[0]} mA | Final: {y[-1]} mA \n Electrodes: {electrodes}"
new = ""
for e in electrodes: new = new + str(e) + ", "
new = new.replace('-Positive','(+)')
new = new.replace('-Negative','(-)')
text = {'Date': _[:10], 'Initial (mA)': y[0], 'Final (mA)':y[-1], 'Electrodes': new}
stimulation['Info'].append(text)
if final_date.date() > utl.parse_datetime(x[-1]).date():
if getStimStatus(Data)[1] == 'ON':
stimulation['X'].append(utl.full_date2str(final_date))
stimulation['Y'].append(info['Amplitude'])
# text = f"{_[:10]} | Initial: {y[-1]} mA | Final: {info['Amplitude']} mA \n Electrodes: {electrodes}"
text = {'Date': _[:10], 'Initial (mA)': y[0], 'Final (mA)':y[-1], 'Electrodes': electrodes}
stimulation['Info'].append(text)
return stimulation
[docs]def getEventSummary(Data):
'''
Returns event summary (number of occurences of each event).
:param Data: dict, from DataHandling.getData()
:return: dict
'''
if 'EventSummary' in Data['Events'].keys():
return Data['Events']['EventSummary']
else: return {}
[docs]def getImpedance(Data):
'''
Returns monopolar and bipolar impedance test results, per hemisphere.
:param Data: dict, from DataHandling.getData()
:return: list, [status (str), Left (dict), Right (dict)]
'''
Left, Right = {}, {}
if len(Data['Device']['Impedance']) == 0:
return 'No Impedance data in file!'
else:
info = Data['Device']['Impedance'][0]
status = utl.after_point(info['ImpedanceStatus'])
tests = info['Hemisphere'][0]['SessionImpedance'].keys()
for type in tests:
Left[type] = info['Hemisphere'][0]['SessionImpedance'][type]
Right[type] = info['Hemisphere'][1]['SessionImpedance'][type]
return status, Left, Right
[docs]def MissingTimeline(data,side,days=None,filtered=None):
'''
Checks missing points in timeline (missing point: when difference between points is >= than 11 minutes) and returns a new corrected timeline dict, correcting missing LFP values through mean interpolation.
new_dic = {
'X': corrected timeline,
'Y': corrected LFP,
'X_missing': missing timeline points,
'Y_missing': missing LFP values}
:param Data: dict, from DataHandling.getData()
:paam side: str
:return: dict
'''
if side == 'Right': side = 'HemisphereLocationDef.Right'
else: side = 'HemisphereLocationDef.Left'
if days: days_hemisphere_right=np.array(days).flatten().tolist()
else: days_hemisphere_right=list(data['Timeline']['LFPTrendLogs'][side].keys())
days_of_search=days_hemisphere_right
time_format_somado=[]
LFP_somado=[]
red_LFP=[]
red_time=[]
if isinstance(days_of_search, list): #this for cycle needs to be changed so it is a search for key
for chosen_day in days_of_search:
for s in days_hemisphere_right:
if chosen_day in s:
key_chosen_day = s
break
if filtered: info_chosen_day=filtered[utl.after_point(side)]
else: info_chosen_day= data['Timeline']['LFPTrendLogs'][side][key_chosen_day]
size_chosen_day = len(info_chosen_day)
time_format=[0]*size_chosen_day
LFP=[0]*size_chosen_day
i=size_chosen_day
if filtered:
LFP = info_chosen_day['Y']
time_format = info_chosen_day['X']
else:
for t in info_chosen_day:
LFP[i-1]=int(t['LFP'])
time_format[i-1]=datetime.fromisoformat(t['DateTime'].rstrip('Z'))
i=i-1
#adding the missing points
index_n_pontos_extra=[]
for s in range(len(time_format)-1):
dif=time_format[s+1]-time_format[s]
dif=dif.seconds
if dif > 660:
n_pontos_extra=(dif//600) - 1
index_n_pontos_extra.append(s)
index_n_pontos_extra.append(n_pontos_extra)
#change this to a dictionary so it is [{'pos':s, 'n_extra': n_pontos_extra}]
n_saltos=[]
n_saltos=len(index_n_pontos_extra)//2
LFP_acrescentar=[]
for i in range(n_saltos):
index_ponto_acrescentar=index_n_pontos_extra[i*2]
numero_pontos_acrescentar=index_n_pontos_extra[i*2+1]
#adding points in y-axis
min_ponto_acrescentar=LFP[index_ponto_acrescentar]
max_ponto_acrescentar=LFP[index_ponto_acrescentar+1]
media_ponto_acrescentar=(min_ponto_acrescentar+max_ponto_acrescentar)/2
LFP_acrescentar=[media_ponto_acrescentar]*numero_pontos_acrescentar
LFP[index_ponto_acrescentar+1:index_ponto_acrescentar+1]=LFP_acrescentar
red_LFP.extend(LFP_acrescentar)
#adding points in x-axis
tempo=time_format[index_ponto_acrescentar]
for j in range(0,numero_pontos_acrescentar+1):
tempo=tempo + timedelta(minutes=10)
next_t = time_format[index_ponto_acrescentar+j+1]
diff = next_t-tempo
if diff>timedelta(minutes=10):
time_format.insert(index_ponto_acrescentar+j+1,tempo)
red_time.append(tempo)
time_format_somado.extend(time_format)
LFP_somado.extend(LFP)
new_dic = {'X': time_format_somado, 'Y':LFP_somado,'X_missing':red_time,'Y_missing':red_LFP}
return new_dic
[docs]def correctMissingTimeline(Data, Timeline, axis='X',days=None,filtered=None):
'''
Corrects timeline recordings for both hemispheres.
:param Data: dict, from DataHandling.getData()
:param Timeline: dict
:param axis: str
:return: dict, corrected Timeline dict
'''
other_axis = 'Y'
added_timeline = {'Right':[],'Left':[]}
added_lfp = {'Right':[],'Left':[]}
missing = utl.after_underscore(axis)
if missing == 'missing':
other_axis = other_axis +'_' + missing
for side in ['Left','Right']:
if isinstance(Timeline,dict): Timeline = [Timeline]
if side not in Timeline[0].keys(): continue
added = MissingTimeline(Data, side,days=days,filtered=filtered)
added_timeline[side] = added[axis]
added_lfp[side] = added[other_axis]
days_added = [utl.extract_date(ad) for ad in added[axis]]
days_added = np.unique(days_added)
new_timeline = {d:[] for d in days_added}
new_lfp = {d:[] for d in days_added}
for idx, at in enumerate(added_timeline[side]):
key = utl.extract_date(at)
new_timeline[key].append(at)
new_lfp[key].append(added_lfp[side][idx])
for i,day in enumerate(Timeline):
date = day['Date']
if isinstance(date,str):
try:
date = utl.parse_datetime(date)
date = utl.extract_date(date)
except Exception:
pass
if axis not in Timeline[i][side].keys():
Timeline[i][side][axis] = []
Timeline[i][side][other_axis] = []
if date in new_timeline.keys():
Timeline[i][side][axis] = new_timeline[date]
Timeline[i][side][other_axis] = new_lfp[date]
return Timeline