Commit 5618492c authored by Ronny Eichler's avatar Ronny Eichler
Browse files

Loading target information/variable layout display

parent b4dd293f
......@@ -5,6 +5,7 @@ from __future__ import print_function
import sys
import cmd
import logging
import argparse
from .lib.constants import LOG_LEVEL_VERBOSE
......@@ -12,7 +13,7 @@ __version__ = 0.01
NO_EXIT_CONFIRMATION = True
LOG_LEVEL = logging.INFO
log_level = LOG_LEVEL
class DataMan(cmd.Cmd):
"""Command line tool for quick data documentation."""
......@@ -20,35 +21,37 @@ class DataMan(cmd.Cmd):
prompt = "dm> "
intro = "Data Manager\n"
log = logging.getLogger(__name__)
def preloop(self):
self.log = logging.getLogger(__name__)
self.log.debug("starting DataMan CLI")
# process command line arguments etc.
def do_greet(self, user):
"""greet [user name]
Simple user greeting. When used in combination with a parameter, will
respond with personalized greeting. Yay."""
if user:
print("hello ", user)
else:
print("hi there!")
def do_ls(self, path):
if not len(path):
path = '.'
self.log.debug("Starting DataMan CLI")
def do_ls(self, line):
parser = argparse.ArgumentParser('Recording statistics', prefix_chars='+/')
parser.add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
cli_args = parser.parse_args(line.split(' '))
path = cli_args.path
import dataman.lib.dirstats as ds
ds.print_table(ds.gather(path))
def do_stats(self, path):
if not len(path):
path = '.'
def do_stats(self, line):
parser = argparse.ArgumentParser('Recording statistics', prefix_chars='+/')
parser.add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
cli_args = parser.parse_args(line.split(' '))
path = cli_args.path
import dataman.lib.dirstats as ds
ds.print_table(ds.gather(path))
def do_vis(self, path):
def do_vis(self, line):
from dataman.vis import vis
vis.run(target=path)
vis.run(line.split(' '))
def do_proc(self, line):
print(sys.argv)
def do_exit(self, line):
"Exit"
......@@ -64,7 +67,6 @@ class DataMan(cmd.Cmd):
def main():
# Command line parsing
import argparse
parser = argparse.ArgumentParser(prog="DataMan")
parser.add_argument('-d', '--debug', action='store_true',
help='Debug mode -- verbose output, no confirmations.')
......@@ -73,48 +75,39 @@ def main():
# sub-parsers
subparsers = parser.add_subparsers(help='sub commands', dest='command')
# CLI
parser_cli = subparsers.add_parser('cli', help='Interactive CLI session')
parser_stats = subparsers.add_parser('stats', help='Recording stats (number channels, duration, sampling rate...')
parser_ls = subparsers.add_parser('ls', help='Directory listing with basic information (e.g. size)')
parser_vis = subparsers.add_parser('vis', help='Launch simple visualizer on data')
parser_proc = subparsers.add_parser('proc', help='(Pre-)processing [NI}')
parser_doc = subparsers.add_parser('doc', help='Documentation for prosperity [NI}')
parser_check = subparsers.add_parser('check', help='Check/verify data and documentation integrity [NI}')
# STATS
parser_stats = subparsers.add_parser('stats', help='Dataset stats (number channels, duration, sampling rate...')
parser_stats.add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
cli_args, cmd_args = parser.parse_known_args()
# LS
parser_ls = subparsers.add_parser('ls', help='Directory listing with basic information (e.g. size)')
parser_ls .add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
# VIS
parser_vis = subparsers.add_parser('vis', help='Launch simple visualizer on dataset')
parser_vis.add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
# PROC, DOC, CHECK
parser_proc = subparsers.add_parser('proc', help='Data processing')
parser_doc = subparsers.add_parser('doc', help='Data documentation')
parser_check = subparsers.add_parser('check', help='Check/verify data and documentation integrity')
cli_args = None
if len(sys.argv) > 1:
cli_args = parser.parse_args()
if cli_args.debug:
NO_EXIT_CONFIRMATION = True
log_level = LOG_LEVEL_VERBOSE if cli_args is not None and cli_args.debug else LOG_LEVEL
logging.addLevelName(LOG_LEVEL_VERBOSE, "VERBOSE")
if cli_args.debug:
NO_EXIT_CONFIRMATION = True
logging.addLevelName(LOG_LEVEL_VERBOSE, 'VERBOSE')
log_level = LOG_LEVEL_VERBOSE if cli_args.debug else LOG_LEVEL
logging.basicConfig(level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
if cli_args is None or cli_args.command == 'cli':
log.debug('CLI_ARGS: {}'.format(cli_args))
log.debug('CMD_ARGS: {}'.format(cmd_args))
# start cli
if cli_args.command in [None, 'cli']:
try:
dm = DataMan().cmdloop()
except KeyboardInterrupt:
pass
# some other command was given
else:
DataMan().onecmd(' '.join(sys.argv[1:]))
print('{} {:}'.format(cli_args.command, ' '.join(cmd_args)))
DataMan().onecmd('{} {}'.format(cli_args.command, ' '.join(cmd_args)))
if __name__ == "__main__":
......
......@@ -5,37 +5,89 @@ from __future__ import print_function
import os
import xml.etree.ElementTree as ET
from .tools import fext, dir_content
import numpy as np
import re
def detect(root=None, dirs=None, files=None):
SIZE_HEADER = 1024 # size of header in B
NUM_SAMPLES = 1024 # number of samples per record
SIZE_RECORD = 2070 # total size of record (2x1024 B samples + record header)
REC_MARKER = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 255], dtype=np.uint8)
# data type of .continuous open ephys 0.2x file format header
HEADER_DT = np.dtype([('Header', 'S%d' % SIZE_HEADER)])
# (2048 + 22) Byte = 2070 Byte total
# FIXME: The rec_mark comes after the samples. Currently data is read assuming full NUM_SAMPLE record!
DATA_DT = np.dtype([('timestamp', np.int64), # 8 Byte
('n_samples', np.uint16), # 2 Byte
('rec_num', np.uint16), # 2 Byte
('samples', ('>i2', NUM_SAMPLES)), # 2 Byte each x 1024 typ.
('rec_mark', (np.uint8, 10))]) # 10 Byte
def read_header(filename):
"""Return dict with .continuous file header content."""
# TODO: Compare headers, should be identical except for channel
# 1 kiB header string data type
header = read_segment(filename, offset=0, count=1, dtype=HEADER_DT)
# Stand back! I know regex!
# Annoyingly, there is a newline character missing in the header (version/header_bytes)
regex = "header\.([\d\w\.\s]{1,}).=.\'*([^\;\']{1,})\'*"
header_str = str(header[0][0]).rstrip(' ')
header_dict = {group[0]: group[1] for group in re.compile(regex).findall(header_str)}
for key in ['bitVolts', 'sampleRate']:
header_dict[key] = float(header_dict[key])
for key in ['blockLength', 'bufferSize', 'header_bytes', 'channel']:
header_dict[key] = int(header_dict[key] if not key == 'channel' else header_dict[key][2:])
return header_dict
def read_segment(filename, offset, count, dtype):
"""Read segment of a file from [offset] for [count]x[dtype]"""
with open(filename, 'rb') as fid:
fid.seek(int(offset))
segment = np.fromfile(fid, dtype=dtype, count=count)
return segment
def read_record(filename, offset=0, count=30, dtype=DATA_DT):
return read_segment(filename, offset=SIZE_HEADER+offset*SIZE_RECORD, count=count, dtype=dtype)['samples']\
.ravel()\
.astype(np.float32)/2**10
def detect(base=None, dirs=None, files=None):
"""Checks for existence of an open ephys formatted data set in the root directory.
Args:
root: Directory to search in.
base: Directory to search in.
dirs: list of subdirectories in root. Will be scanned if not provided.
files: List of files in the root directory. Will be scanned if not provided.
Returns:
None if no data set found, else a string with data set format name and version.
None if no data set found, else a dict of configuration data from settings.xml
"""
# TODO: Make all three optional and work with either
if dirs is None or files is None:
_, dirs, files = dir_content(root)
_, dirs, files = dir_content(base)
for f in files:
if fext(f) in ['.continuous']:
settings_xml = _find_settings_xml(root)
if settings_xml is None:
fv = None
else:
fv = format_version(settings_xml)
# print(_fpga_node(settings_xml))
# settings_xml = find_settings_xml(root)
# if settings_xml is None:
# fv = None
# else:
fv = config(base)['INFO']['VERSION']
return "OE_v{}".format(fv if fv else '???')
else:
return False
return None
def _fpga_node(path):
chain = _config(path)['SIGNALCHAIN']
chain = config(path)['SIGNALCHAIN']
nodes = [p['attrib']['NodeId'] for p in chain if p['type']=='PROCESSOR' and 'FPGA' in p['attrib']['name']]
if len(nodes) == 1:
return nodes[0]
......@@ -43,12 +95,12 @@ def _fpga_node(path):
raise BaseException('Node ID not found in xml file {}'.format(path))
def format_version(path):
settings = _config(path)
return settings['INFO']['VERSION']
# def format_version(path):
# settings = config(path)
# return settings
#
def _find_settings_xml(base, dirs=None, files=None):
def find_settings_xml(base, dirs=None, files=None):
if dirs is None or files is None:
_, dirs, files = dir_content(base)
if "settings.xml" in files:
......@@ -57,7 +109,7 @@ def _find_settings_xml(base, dirs=None, files=None):
return None
def _config(path):
def config(path):
"""Reads Open Ephys XML settings file and returns dictionary with relevant information.
- Info field
Dict(GUI version, date, OS, machine),
......@@ -67,25 +119,47 @@ def _config(path):
but the order of assembling the chain.
- Audio
Int bufferSize
- Header
Dict(data from a single file header, i.e. sampling rate, blocks, etc.)
Args:
path: Path to settings.xml file
Returns:
Dict{INFO, SIGNALCHAIN, AUDIO}
Dict{INFO, SIGNALCHAIN, AUDIO, HEADER}
"""
root = ET.parse(path).getroot()
# Settings.xml file
xml_path = find_settings_xml(path)
root = ET.parse(xml_path).getroot()
info = dict(
VERSION = root.find('INFO/VERSION').text,
DATE = root.find('INFO/DATE').text,
OS = root.find('INFO/OS').text,
MACHINE = root.find('INFO/VERSION').text
)
sc = root.find('SIGNALCHAIN')
chain = [dict(type=e.tag, attrib=e.attrib) for e in sc.getchildren()]
audio = root.find('AUDIO').attrib
# Data file header (reliable sampling rate information)
# FIXME: Make sure all headers agree...
file_name = os.path.join(path, '106_CH1.continuous')
header = read_header(file_name)
fs = header['sampleRate']
n_blocks = (os.path.getsize(file_name)-1024)/2070
n_samples = n_blocks*1024
# self.logger.info('Fs = {}kHz, {} blocks, {:.0f} samples, {:02.0f}min:{:02.0f}s'
# .format(fs/1e3, n_blocks, n_samples,
# math.floor(n_samples/fs/60),
# math.floor(n_samples/fs%60)))
header = dict(n_blocks=n_blocks,
block_size=NUM_SAMPLES,
n_samples=n_samples,
sampling_rate=fs)
return dict(INFO=info,
SIGNALCHAIN=chain,
AUDIO=audio)
AUDIO=audio,
HEADER=header)
......@@ -25,6 +25,27 @@ def fmt_size(num, unit='B', si=True, sep=' ', col=False, pad=0):
num /= divisor
def fmt_time(s, minimal=True):
"""
Args:
s: time in seconds (float for fractional)
minimal: Flag, if true, only return strings for times > 0, leave rest outs
Returns: String formatted 99h 59min 59.9s, where elements < 1 are left out optionally.
"""
ms = s-int(s)
s = int(s)
if s < 60 and minimal:
return "{s:02.3f}s".format(s=s+ms)
m, s = divmod(s, 60)
if m < 60 and minimal:
return "{m:02d}min {s:02.3f}s".format(m=m, s=s+ms)
h, m = divmod(m, 60)
return "{h:02d}h {m:02d}min {s:02.3f}s".format(h=h, m=m, s=s+ms)
def fext(fname):
"""Grabs the file extension of a file.
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Sep 23, 2015 18:53
@author: <'Ronny Eichler'> ronny.eichler@gmail.com
File reader
"""
from __future__ import print_function
import numpy as np
import re
SIZE_HEADER = 1024 # size of header in B
NUM_SAMPLES = 1024 # number of samples per record
SIZE_RECORD = 2070 # total size of record (2x1024 B samples + record header)
REC_MARKER = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 255], dtype=np.uint8)
# data type of .continuous open ephys 0.2x file format header
HEADER_DT = np.dtype([('Header', 'S%d' % SIZE_HEADER)])
# (2048 + 22) Byte = 2070 Byte total
# FIXME: The rec_mark comes after the samples. Currently data is read assuming full NUM_SAMPLE record!
DATA_DT = np.dtype([('timestamp', np.int64), # 8 Byte
('n_samples', np.uint16), # 2 Byte
('rec_num', np.uint16), # 2 Byte
('samples', ('>i2', NUM_SAMPLES)), # 2 Byte each x 1024 typ.
('rec_mark', (np.uint8, 10))]) # 10 Byte
def read_header(filename):
"""Return dict with .continuous file header content."""
# TODO: Compare headers, should be identical except for channel
# 1 kiB header string data type
header = read_segment(filename, offset=0, count=1, dtype=HEADER_DT)
# Stand back! I know regex!
# Annoyingly, there is a newline character missing in the header (version/header_bytes)
regex = "header\.([\d\w\.\s]{1,}).=.\'*([^\;\']{1,})\'*"
header_str = str(header[0][0]).rstrip(' ')
header_dict = {group[0]: group[1] for group in re.compile(regex).findall(header_str)}
for key in ['bitVolts', 'sampleRate']:
header_dict[key] = float(header_dict[key])
for key in ['blockLength', 'bufferSize', 'header_bytes', 'channel']:
header_dict[key] = int(header_dict[key] if not key == 'channel' else header_dict[key][2:])
return header_dict
def read_segment(filename, offset, count, dtype):
"""Read segment of a file from [offset] for [count]x[dtype]"""
with open(filename, 'rb') as fid:
fid.seek(int(offset))
segment = np.fromfile(fid, dtype=dtype, count=count)
return segment
def read_record(filename, offset=0, count=30, dtype=DATA_DT):
return read_segment(filename, offset=SIZE_HEADER+offset*SIZE_RECORD, count=count, dtype=dtype)['samples']\
.ravel()\
.astype(np.float32)/2**10
# def reader(filename, buf):
# """
# Reader for a single .continuous file. Writes as many (complete, i.e. NUM_SAMPLES) records into given
# the buffer as can fit.
# :param filename: File name of the input .continuous file.
# :param buf: Designated column of a numpy array used for temporary storage/stacking of multiple channel data.
# :return: Dictionary of all headers read and stored in buffer.
# """
# # TODO: Allow sending new index to generator
# # TODO: Check record size for completion
# with open(filename, 'rb') as fid:
# yield np.fromfile(fid, HEADER_DT, 1)
# while True:
# data = np.fromfile(fid, DATA_DT, len(buf)/NUM_SAMPLES)
# buf[:len(data)*NUM_SAMPLES] = data['samples'].ravel()
# yield {idx: data[idx] for idx in data.dtype.names if idx != 'samples'} if len(data) else None
if __name__ == "__main__":
pass
# print(read_header('data/2014-10-30_16-07-29/106_CH1.continuous'))
# print(read_segment('data/2014-10-30_16-07-29/106_CH1.continuous', offset=SIZE_HEADER)['samples'].ravel()[0])
......@@ -11,22 +11,17 @@ Multiple real-time digital signals with GLSL-based clipping.
from __future__ import division, print_function
import logging
import os
import sys
from vispy import gloo
from vispy import app
from vispy import util
import numpy as np
import math
from .reader import read_record, read_header
from ..lib.open_ephys import read_record
from ..lib import open_ephys, tools
# Dimensions of plot segment/signals
n_rows = 16
n_cols = 4
n_signals = n_rows*n_cols
n_samples = int(3e4)
# Buffer to store all the pre-loaded signals
buf = np.zeros((n_signals, n_samples), dtype=np.float32)
from oio import util
# Load vertex and fragment shaders
SHADER_PATH = os.path.join(os.path.dirname(__file__), 'shaders')
......@@ -35,40 +30,75 @@ with open(os.path.join(SHADER_PATH, 'vis.vert')) as vs:
with open(os.path.join(SHADER_PATH, 'vis.frag')) as fs:
FRAG_SHADER = fs.read()
# Color of each vertex
# TODO: make it more efficient by using a GLSL-based color map and the index.
color = np.repeat(np.random.uniform(size=(n_rows, 3), low=.1, high=.9),
n_samples*n_cols, axis=0).astype(np.float32)
# Signal 2D index of each vertex (row and col) and x-index (sample index
# within each signal).
index = np.c_[np.repeat(np.repeat(np.arange(n_cols), n_rows), n_samples),
np.repeat(np.tile(np.arange(n_rows), n_cols), n_samples),
np.tile(np.arange(n_samples), n_signals)].astype(np.float32)
class Vis(app.Canvas):
def __init__(self, target):
app.Canvas.__init__(self, title='Use your wheel to zoom!',
keys='interactive')
def __init__(self, target_dir, n_cols=1, n_channels=64, max_samples_visible=30000, channels=None):
app.Canvas.__init__(self, title='Use your wheel to zoom!', keys='interactive')
self.logger = logging.getLogger("Vis")
# running traces, looks cool, but useless for the most part
self.running = False
self.offset = 0
self.drag_offset = 0
self.target = target
self.test_target()
# Target configuration (format, sampling rate, sizes...)
cfg = self._get_target_config(target_dir)
self.target_dir = target_dir
self.fs = cfg['HEADER']['sampling_rate']
self.n_blocks = int(cfg['HEADER']['n_blocks'])
self.block_size = int(cfg['HEADER']['block_size'])
self.n_samples_total = int(cfg['HEADER']['n_samples'])
self.max_samples_visible = int(max_samples_visible)
self.duration_total = tools.fmt_time(self.n_samples_total/self.fs)
# FIXME: Get maximum number of channels from list of valid file names
if not(n_channels) or n_channels < 1:
self.n_channels = int(64)
else:
self.n_channels = int(n_channels)
self.channel_order = channels # if None: no particular order
self.n_cols = int(n_cols)
self.n_rows = int(math.ceil(self.n_channels / self.n_cols))
self.logger.info('n_channels: {}, col/row: {}, buffer_size: {}, '
' total_samples: {}, total_duration: {}'.format(self.n_channels,
(self.n_cols, self.n_rows),
self.max_samples_visible,
self.n_samples_total,
self.duration_total))
# Buffer to store all the pre-loaded signals
self.buf = np.zeros((self.n_channels, self.max_samples_visible), dtype=np.float32)
# Color of each vertex
# TODO: make it more efficient by using a GLSL-based color map and the index.
color = np.repeat(np.random.uniform(size=(self.n_rows, 3),
low=.1, high=.9),
self.max_samples_visible * self.n_cols, axis=0).astype(np.float32)
# Signal 2D index of each vertex (row and col) and x-index (sample index
# within each signal).
# FIXME: Build from lists for readability
index = np.c_[np.repeat(np.repeat(np.arange(self.n_cols), self.n_rows), self.max_samples_visible),
np.repeat(np.tile(np.arange(self.n_rows), self.n_cols), self.max_samples_visible),
np.tile(np.arange(self.max_samples_visible), self.n_channels)]\
.astype(np.float32)
# Most of the magic happens in the vertex shader, moving the samples into "position" using
# an affine transform based on number of columns and rows for the plot, scaling, etc.
self.program = gloo.Program(VERT_SHADER, FRAG_SHADER)
self.program['a_position'] = buf.reshape(-1, 1)
# FIXME: Reshaping not necessary?
self.program['a_position'] = self.buf.reshape(-1, 1)
self.program['a_color'] = color
self.program['a_index'] = index
self.program['u_scale'] = (1., 1.)
self.program['u_size'] = (n_rows, n_cols)
self.program['u_n'] = n_samples
self.program['u_size'] = (self.n_rows, self.n_cols)
self.program['u_n'] = self.max_samples_visible
gloo.set_viewport(0, 0, *self.physical_size)
# sys.exit(0)
self._timer = app.Timer('auto', connect=self.on_timer, start=True)
gloo.set_state(clear_color='black', blend=True,
......@@ -76,17 +106,16 @@ class Vis(app.Canvas):
self.show()
def test_target(self):
fname = os.path.join(self.target, '106_CH1.continuous')
self.logger.info("Reading file header of {}".format(fname))
hdr = read_header(fname)
fs = hdr['sampleRate']
n_blocks = (os.path.getsize(fname)-1024)/2070
n_samples = n_blocks*1024
self.logger.info('Fs = {}kHz, {} blocks, {:.0f} samples, {:02.0f}min:{:02.0f}s'
.format(fs/1e3, n_blocks, n_samples,
math.floor(n_samples/fs/60),
math.floor(n_samples/fs%60)))
def _get_target_config(self, target_dir):
# Check if we actually have data in there...
data_format = open_ephys.detect(target_dir)
if not(data_format) or not('OE_' in data_format):
self.logger.error('No valid open ephys .continuous data found at {}'.format(target_dir))
sys.exit(1)
self.logger.debug('Target found: {}'.format(data_format))
return open_ephys.config(target_dir)
def set_scale(self, factor_x=1.0, factor_y=1.0, scale_x=None, scale_y=None):
scale_x_old, scale_y_old = self.program['u_scale']
......@@ -97,12 +126,14 @@ class Vis(app.Canvas):
self.program['u_scale'] = (max(1, scale_x_new), max(.05, scale_y_new))
def set_offset(self, relative=0, absolute=0):
""" Offset in blocks of 1024 samples """
self.offset = absolute or self.offset
self.offset += relative
if self.offset < 0:
self.offset = 0