Commit 377b1d4f authored by Ronny Eichler's avatar Ronny Eichler
Browse files

Config handling

parent 1cf31b9a
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import xml.etree.ElementTree as ET
from .tools import fext, dir_content
from .tools import fext, dir_content, fmt_time
import numpy as np
import re
import math
import logging
SIZE_HEADER = 1024 # size of header in B
......@@ -17,6 +18,8 @@ REC_MARKER = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 255], dtype=np.uint8)
# data type of .continuous open ephys 0.2x file format header
HEADER_DT = np.dtype([('Header', 'S%d' % SIZE_HEADER)])
logger = logging.getLogger(__name__)
# (2048 + 22) Byte = 2070 Byte total
# FIXME: The rec_mark comes after the samples. Currently data is read assuming full NUM_SAMPLE record!
DATA_DT = np.dtype([('timestamp', np.int64), # 8 Byte
......@@ -55,16 +58,17 @@ def read_segment(filename, offset, count, dtype):
def read_record(filename, offset=0, count=30, dtype=DATA_DT):
# FIXME: Stupid undocumented magic division of return value...
return read_segment(filename, offset=SIZE_HEADER+offset*SIZE_RECORD, count=count, dtype=dtype)['samples']\
.ravel()\
.astype(np.float32)/2**10
def detect(base=None, dirs=None, files=None):
def detect(base_dir=None, dirs=None, files=None):
"""Checks for existence of an open ephys formatted data set in the root directory.
Args:
base: Directory to search in.
base_dir: Directory to search in.
dirs: list of subdirectories in root. Will be scanned if not provided.
files: List of files in the root directory. Will be scanned if not provided.
......@@ -73,43 +77,90 @@ def detect(base=None, dirs=None, files=None):
"""
# TODO: Make all three optional and work with either
if dirs is None or files is None:
_, dirs, files = dir_content(base)
_, dirs, files = dir_content(base_dir)
# FIXME: Do once for files
for f in files:
if fext(f) in ['.continuous']:
# settings_xml = find_settings_xml(root)
# if settings_xml is None:
# fv = None
# else:
fv = config(base)['INFO']['VERSION']
fv = config_xml(base_dir)['INFO']['VERSION']
return "OE_v{}".format(fv if fv else '???')
else:
return None
def _fpga_node(path):
chain = config(path)['SIGNALCHAIN']
nodes = [p['attrib']['NodeId'] for p in chain if p['type']=='PROCESSOR' and 'FPGA' in p['attrib']['name']]
def find_settings_xml(base_dir):
"""Search for the settings.xml file in the base directory.
:param base_dir: Base directory of data set
:param dirs: List of directories from globbing
:param files: List of files from globbing
:return: Path to settings.xml relative to base_dir
"""
_, dirs, files = dir_content(base_dir)
if "settings.xml" in files:
return os.path.join(base_dir, 'settings.xml')
else:
return None
def _fpga_node(chain_dict):
"""Find the FPGA node in the signal chain. Assuming this one was used for recording, will help
finding the proper .continuous files.
Args:
base_dir: Root directory of data set.
Returns:
string of NodeID (e.g. '106')
"""
# chain = config_xml(base_dir)['SIGNALCHAIN']
nodes = [p['attrib']['NodeId'] for p in chain_dict if p['type']=='PROCESSOR' and 'FPGA' in p['attrib']['name']]
logger.info('Found FPGA node(s): {}'.format(nodes))
if len(nodes) == 1:
return nodes[0]
if len(nodes) > 1:
raise BaseException('Multiple FPGA nodes found. (Good on you!) {}'.format(nodes))
else:
raise BaseException('Node ID not found in xml file {}'.format(path))
raise BaseException('Node ID not found in xml dict {}'.format(chain_dict))
# def format_version(path):
# settings = config(path)
# return settings
#
def config(base_dir):
"""Get recording/data set configuration from open ephys GUI settings.xml file and the header
of files from the data set
def find_settings_xml(base, dirs=None, files=None):
if dirs is None or files is None:
_, dirs, files = dir_content(base)
if "settings.xml" in files:
return os.path.join(base, 'settings.xml')
else:
return None
Args:
base_dir: path to the data set
Returns:
Dictionary with configuration entries. (INFO, SIGNALCHAIN, HEADER, AUDIO, FPGA_NODE)"""
cfg = config_xml(base_dir)
cfg['HEADER'] = config_header(base_dir)
cfg['FPGA_NODE'] = _fpga_node(cfg['SIGNALCHAIN'])
return cfg
def config_header(base_dir):
"""Reads header information from open ephys .continuous files in target path.
This returns some "reliable" information about the sampling rate."""
# Data file header (reliable sampling rate information)
# FIXME: Make sure all headers agree...
file_name = os.path.join(base_dir, '106_CH1.continuous')
header = read_header(file_name)
fs = header['sampleRate']
n_blocks = (os.path.getsize(file_name)-SIZE_HEADER)/SIZE_RECORD
assert (os.path.getsize(file_name)-SIZE_HEADER)%SIZE_RECORD == 0
n_samples = int(n_blocks*NUM_SAMPLES)
logger.info('Fs = {:.2f}Hz, {} blocks, {} samples, {}'
.format(fs, n_blocks, n_samples, fmt_time(n_samples/fs)))
return dict(n_blocks=int(n_blocks),
block_size=NUM_SAMPLES,
n_samples=int(n_samples),
sampling_rate=fs)
def config(path):
def config_xml(base_dir):
"""Reads Open Ephys XML settings file and returns dictionary with relevant information.
- Info field
Dict(GUI version, date, OS, machine),
......@@ -123,43 +174,31 @@ def config(path):
Dict(data from a single file header, i.e. sampling rate, blocks, etc.)
Args:
path: Path to settings.xml file
base_dir: Path to settings.xml file
Returns:
Dict{INFO, SIGNALCHAIN, AUDIO, HEADER}
"""
# Settings.xml file
xml_path = find_settings_xml(path)
xml_path = find_settings_xml(base_dir)
root = ET.parse(xml_path).getroot()
# Recording system information
info = dict(
VERSION = root.find('INFO/VERSION').text,
DATE = root.find('INFO/DATE').text,
OS = root.find('INFO/OS').text,
MACHINE = root.find('INFO/VERSION').text
)
# Signal chain/processing nodes
sc = root.find('SIGNALCHAIN')
chain = [dict(type=e.tag, attrib=e.attrib) for e in sc.getchildren()]
audio = root.find('AUDIO').attrib
# Data file header (reliable sampling rate information)
# FIXME: Make sure all headers agree...
file_name = os.path.join(path, '106_CH1.continuous')
header = read_header(file_name)
fs = header['sampleRate']
n_blocks = (os.path.getsize(file_name)-1024)/2070
n_samples = n_blocks*1024
# self.logger.info('Fs = {}kHz, {} blocks, {:.0f} samples, {:02.0f}min:{:02.0f}s'
# .format(fs/1e3, n_blocks, n_samples,
# math.floor(n_samples/fs/60),
# math.floor(n_samples/fs%60)))
header = dict(n_blocks=n_blocks,
block_size=NUM_SAMPLES,
n_samples=n_samples,
sampling_rate=fs)
# Audio settings
audio = root.find('AUDIO').attrib
return dict(INFO=info,
SIGNALCHAIN=chain,
AUDIO=audio,
HEADER=header)
AUDIO=audio)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import re
from termcolor import colored
......
......@@ -8,7 +8,6 @@
Multiple real-time digital signals with GLSL-based clipping.
"""
from __future__ import division, print_function
import logging
import os
import sys
......@@ -32,9 +31,9 @@ with open(os.path.join(SHADER_PATH, 'vis.frag')) as fs:
class Vis(app.Canvas):
def __init__(self, target_dir, n_cols=1, n_channels=64, max_samples_visible=30000, channels=None):
def __init__(self, target_dir, n_cols=1, n_channels=64, max_samples_visible=30000, channels=None, bad_channels=None):
app.Canvas.__init__(self, title='Use your wheel to zoom!', keys='interactive', size=(1920, 1080),
position=(0, 0))
position=(0, 0), app='pyqt5')
self.logger = logging.getLogger("Vis")
# running traces, looks cool, but useless for the most part
self.running = False
......@@ -42,17 +41,20 @@ class Vis(app.Canvas):
self.drag_offset = 0
# Target configuration (format, sampling rate, sizes...)
cfg = self._get_target_config(target_dir)
self.cfg = self._get_target_config(target_dir)
self.target_dir = target_dir
self.fs = cfg['HEADER']['sampling_rate']
self.n_blocks = int(cfg['HEADER']['n_blocks'])
self.block_size = int(cfg['HEADER']['block_size'])
self.n_samples_total = int(cfg['HEADER']['n_samples'])
self.fs = self.cfg['HEADER']['sampling_rate']
self.node_id = self.cfg['FPGA_NODE']
self.n_blocks = int(self.cfg['HEADER']['n_blocks'])
self.block_size = int(self.cfg['HEADER']['block_size'])
self.n_samples_total = int(self.cfg['HEADER']['n_samples'])
self.max_samples_visible = int(max_samples_visible)
self.duration_total = tools.fmt_time(self.n_samples_total/self.fs)
self.duration_total = tools.fmt_time(self.n_samples_total / self.fs)
self.logger.debug(self.cfg)
# FIXME: Get maximum number of channels from list of valid file names
if not(n_channels) or n_channels < 1:
if not (n_channels) or n_channels < 1:
self.n_channels = int(64)
else:
self.n_channels = int(n_channels)
......@@ -74,11 +76,13 @@ class Vis(app.Canvas):
# Color of each vertex
# TODO: make it more efficient by using a GLSL-based color map and the index.
color = np.repeat(np.random.uniform(size=(self.n_rows//4, 3),
color = np.repeat(np.random.uniform(size=(self.n_rows // 4, 3),
low=.1, high=.9),
self.max_samples_visible * self.n_cols*4, axis=0).astype(np.float32)
cmap_path = os.path.join(os.path.join(os.path.dirname(__file__), 'shaders'), '4x4x8_half_vega20c_cmap.csv')
cmap = np.loadtxt(cmap_path, delimiter=',')
self.max_samples_visible * self.n_cols * 4, axis=0).astype(np.float32)
# Load a nice color map instead of the random colors
# cmap_path = os.path.join(os.path.join(os.path.dirname(__file__), 'shaders'), '4x4x8_half_vega20c_cmap.csv')
# cmap = np.loadtxt(cmap_path, delimiter=',')
# colors = np.repeat(cmap[:self.n_channels])
# print(color.shape, cmap.shape)
......@@ -87,14 +91,14 @@ class Vis(app.Canvas):
# FIXME: Build from lists for readability
index = np.c_[np.repeat(np.repeat(np.arange(self.n_cols), self.n_rows), self.max_samples_visible),
np.repeat(np.tile(np.arange(self.n_rows), self.n_cols), self.max_samples_visible),
np.tile(np.arange(self.max_samples_visible), self.n_channels)]\
np.tile(np.arange(self.max_samples_visible), self.n_channels)] \
.astype(np.float32)
# Most of the magic happens in the vertex shader, moving the samples into "position" using
# an affine transform based on number of columns and rows for the plot, scaling, etc.
self.program = gloo.Program(VERT_SHADER, FRAG_SHADER)
# FIXME: Reshaping not necessary?
self.program['a_position'] = self.buf.reshape(-1, 1)
self.program['a_position'] = self.buf # .reshape(-1, 1)
self.program['a_color'] = color
self.program['a_index'] = index
self.program['u_scale'] = (1., 1.)
......@@ -115,7 +119,7 @@ class Vis(app.Canvas):
# Check if we actually have data in there...
data_format = open_ephys.detect(target_dir)
if not(data_format) or not('OE_' in data_format):
if not (data_format) or not ('OE_' in data_format):
self.logger.error('No valid open ephys .continuous data found at {}'.format(target_dir))
sys.exit(1)
self.logger.debug('Target found: {}'.format(data_format))
......@@ -132,19 +136,18 @@ class Vis(app.Canvas):
def set_offset(self, relative=0, absolute=0):
""" Offset in blocks of 1024 samples """
self.offset = absolute or self.offset
self.offset += relative
self.offset = int(absolute or self.offset)
self.offset += int(relative)
if self.offset < 0:
self.offset = 0
elif self.offset >= (self.n_samples_total-self.max_samples_visible)//self.block_size:
self.offset = (self.n_samples_total-self.max_samples_visible)//self.block_size
# print(self.offset)
elif self.offset >= (self.n_samples_total - self.max_samples_visible) // self.block_size:
self.offset = (self.n_samples_total - self.max_samples_visible) // self.block_size
self.logger.debug('Block offset: {}, @ {}'.format(self.offset, tools.fmt_time(self.offset*1024/self.fs)))
def on_resize(self, event):
gloo.set_viewport(0, 0, *event.physical_size)
def on_key_press(self, event):
# print event.key
if event.key == 'Space':
self.running = not self.running
elif event.key == 'Q':
......@@ -170,21 +173,20 @@ class Vis(app.Canvas):
"""Handle mouse drag and hover"""
if event.is_dragging:
trail = event.trail()
width = self.size[0]/self.n_cols
height = self.size[1]/self.n_rows
dx = trail[-1][0]-trail[0][0]
dy = trail[-1][1]-trail[0][1]
width = self.size[0] / self.n_cols
height = self.size[1] / self.n_rows
dx = trail[-1][0] - trail[0][0]
dy = trail[-1][1] - trail[0][1]
if event.button == 1:
shift_signal = dx/width
shift_signal = dx / width
shift_samples = shift_signal * self.max_samples_visible
shift_offset = int(shift_samples/1024)
# print(self.drag_offset-shift_offset)
self.set_offset(absolute=self.drag_offset-shift_offset)
shift_offset = int(shift_samples / 1024)
self.set_offset(absolute=self.drag_offset - shift_offset)
if event.button == 2:
self.set_scale(scale_x=1.0*math.exp(dx/width),
scale_y=1.0*math.exp(dy/height))
self.set_scale(scale_x=1.0 * math.exp(dx / width),
scale_y=1.0 * math.exp(dy / height))
def on_mouse_press(self, event):
self.drag_offset = self.offset
......@@ -197,27 +199,28 @@ class Vis(app.Canvas):
Ctrl+MW: y-axis scale (amplitude)
"""
if not len(event.modifiers):
dx = -np.sign(event.delta[1])*int(event.delta[1]**2)
dx = -np.sign(event.delta[1]) * int(event.delta[1] ** 2)
self.set_offset(relative=dx)
else:
delta = np.sign(event.delta[1]) * .05
if util.keys.SHIFT in event.modifiers:
self.set_scale(factor_x=math.exp(2.5*delta))
self.set_scale(factor_x=math.exp(2.5 * delta))
elif util.keys.CONTROL in event.modifiers:
self.set_scale(factor_y=math.exp(2.5*delta))
self.set_scale(factor_y=math.exp(2.5 * delta))
self.update()
def on_mouse_double_click(self, event):
x, y = event.pos
x_r = x/self.size[0]
y_r = y/self.size[1]
# print(event.pos, self.size, x_r, y_r)
t_r = x_r*self.n_cols - math.floor(x_r*self.n_cols)
t_sample = (t_r * self.max_samples_visible + self.offset*self.block_size)
t_sec = t_sample/self.fs
print('Sample {} @ {}'.format(int(t_sample), tools.fmt_time(t_sec)))
x_r = x / self.size[0]
y_r = y / self.size[1]
# TODO: use y-coordinate to guesstimate the channel id + amplitude at point
t_r = x_r * self.n_cols - math.floor(x_r * self.n_cols)
t_sample = (t_r * self.max_samples_visible + self.offset * self.block_size)
t_sec = t_sample / self.fs
self.logger.info('Sample {} @ {}'.format(int(t_sample), tools.fmt_time(t_sec)))
def on_timer(self, event):
"""Frame update callback."""
......@@ -231,8 +234,10 @@ class Vis(app.Canvas):
for i in range(self.n_channels):
chan_id = i if self.channel_order is None else self.channel_order[i]
self.buf[i, :self.max_samples_visible] = \
read_record(os.path.join(self.target_dir, '106_CH{}.continuous'.format(chan_id + 1)),
offset=self.offset)[:self.max_samples_visible]
read_record(os.path.join(self.target_dir, '{node_id}_CH{channel}.continuous'.format(
node_id=self.node_id,
channel=chan_id + 1)),
offset=self.offset)[:self.max_samples_visible]
self.program['a_position'].set_data(self.buf)
if self.running:
......@@ -256,14 +261,20 @@ def run(*args, **kwargs):
cli_args = parser.parse_args(*args)
if 'layout' in cli_args and cli_args.layout is not None:
channels = oio_util.flat_channel_list(cli_args.layout)[:cli_args.channels]
print(channels)
layout = oio_util.run_prb(cli_args.layout)
channels, bad_channels = oio_util.flat_channel_list(layout)[:cli_args.channels]
else:
channels = None
bad_channels = None
vis = Vis(cli_args.path, n_cols=cli_args.cols, n_channels=cli_args.channels, channels=channels)
vis = Vis(cli_args.path,
n_cols=cli_args.cols,
n_channels=cli_args.channels,
channels=channels,
bad_channels=bad_channels)
app.run()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment