Commit c5e0bf98 authored by Ronny Eichler's avatar Ronny Eichler
Browse files

Added basic visualizer based on vispy example

parent ea0bbae6
......@@ -3,6 +3,7 @@
#############
testing/
dataman/vis/data
#############
## Python
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Mutable buffer/array that can be shared between multiple processes.
Inspired by https://github.com/belevtsoff/rdaclient.py
* `Buffer`: The buffer
* `datatypes`: supported datatypes
* `BufferHeader`: header structure containing metadata
* `BufferError`: error definitions
"""
from multiprocessing import Array
import ctypes as c
import logging
import numpy as np
class Buffer(object):
"""
One-dimensional buffer with homogenous elements.
The buffer can be used simultaneously by multiple processes, because
both data and metadata are stored in a single sharedctypes byte array.
First, the buffer object is created and initialized in one of the
processes. Second, its raw array is shared with others. Third, those
processes create their own Buffer objects and initialize them so that
all point to the same shared raw array.
"""
def __init__(self):
self.logger = logging.getLogger("Buffer")
self.__initialized = False
def __str__(self):
return self.__buf[:self.bufSize].__str__() + '\n'
def __getattr__(self, item):
"""Overload to prevent access to the buffer attributes before
initialization is complete.
"""
if self.__initialized:
return object.__getattribute__(self, item)
else:
raise BufferError(1)
# -------------------------------------------------------------------------
# PROPERTIES
# read only attributes
is_initialized = property(lambda self: self.__initialized, None, None,
'Indicates whether the buffer is initialized, read only (bool)')
raw = property(lambda self: self.__raw, None, None,
'Raw buffer array, read only (sharedctypes, char)')
nChannels = property(lambda self: self.__nChannels, None, None,
'Dimensionality of array in channels, read only (int)')
nSamples = property(lambda self: self.__nSamples, None, None,
'Dimensionality of array in samples, read only (int)')
bufSize = property(lambda self: self.__bufSize, None, None,
'Buffer size, read only (int)')
nptype = property(lambda self: self.__nptype, None, None,
'The type of the data in the buffer, read only (string)')
# -------------------------------------------------------------------------
def initialize(self, nChannels, nSamples, nptype='float32'):
"""Initializes the buffer with a new array.
"""
# check parameters
if nChannels < 1 or nSamples < 1:
self.logger.error('nChannels and nSamples must be a positive integer')
raise BufferError(1)
sizeBytes = c.sizeof(BufferHeader) + \
nSamples * nChannels * np.dtype(nptype).itemsize
raw = Array('c', sizeBytes)
hdr = BufferHeader.from_buffer(raw.get_obj())
hdr.bufSizeBytes = sizeBytes - c.sizeof(BufferHeader)
hdr.dataType = datatypes.get_code(nptype)
hdr.nChannels = nChannels
hdr.nSamples = nSamples
self.initialize_from_raw(raw.get_obj())
def initialize_from_raw(self, raw):
"""Initiates the buffer with the compatible external raw array.
All the metadata will be read from the header region of the array.
"""
self.__initialized = True
hdr = BufferHeader.from_buffer(raw)
# datatype
nptype = datatypes.get_type(hdr.dataType)
bufOffset = c.sizeof(hdr)
bufFlatSize = hdr.bufSizeBytes / np.dtype(nptype).itemsize
# create numpy view object pointing to the raw array
self.__raw = raw
self.__hdr = hdr
self.__buf = np.frombuffer(raw, nptype, bufFlatSize, bufOffset) \
.reshape((-1, hdr.nSamples))
# helper variables
self.__nChannels = hdr.nChannels
self.__bufSize = len(self.__buf)
self.__nptype = nptype
def __write_buffer(self, data, start, end=None):
"""Writes data to buffer."""
# roll array
# overwrite old section
if end is None:
end = start+data.shape[1]
self.__buf[:, start:end] = data
def __read_buffer(self, start, end):
"""Reads data from buffer, returning view into numpy array"""
av_error = self.check_availablility(start, end)
if not av_error:
return self.__buf[:, start:end]
else:
raise BufferError(av_error)
def get_data(self, start, end, wprotect=True):
data = self.__read_buffer(start, end)
data.setflags(write=not wprotect)
return data
def put_data(self, data, start=0):
datashape = data.shape
if len(datashape) != 1:
if (data.shape[1] != self.nChannels):
raise BufferError(4)
else:
datashape = (len(data), 1)
if self.nChannels != 1:
raise BufferError(4)
end = start + len(data)
self.__write_buffer(data, start, end)
def check_availablility(self, start, end):
"""Checks whether the requested data samples are available.
Parameters
----------
start : int
first sample index (included)
end : int
last samples index (excluded)
Returns
-------
0
if the data is available and already in the buffer
1
if the data is available but needs to be read in
2
if data is partially unavailable
3
if data is completely unavailable
"""
if start < end:
return 0
else:
return
# if sampleStart < 0 or sampleEnd <= 0:
# return 5
# if sampleEnd > self.nSamplesWritten:
# return 3 # data is not ready
# if (self.nSamplesWritten - sampleStart) > self.bufSize:
# return 2 # data is already erased
#
# return 0
class datatypes():
"""A helper class to interpret the type code read from buffer header.
To add new supported data types, add them to the 'type' dictionary
"""
types = {0: 'float32',
1: 'int16'}
@classmethod
def get_code(cls, ndtype):
"""Gets buffer type code given numpy datatype
Parameters
----------
ndtype : string
numpy datatype (e.g. 'float32')
"""
idx = cls.types.values().index(ndtype)
return cls.types.keys()[idx]
@classmethod
def get_type(cls, code):
"""Gets numpy data type given a buffer type code
Parameters
----------
code : int
type code (e.g. 0)
"""
return cls.types[code]
class BufferHeader(c.Structure):
"""A ctypes structure describing the buffer header
Attributes
----------
bufSizeBytes : c_ulong
size of the buffer in bytes, excluding header and pocket
pocketSizeBytes : c_ulong
size of the buffer in bytes
dataType : c_uint
typecode of the data stored in the buffer
nChannels : c_ulong
sample dimensionality
nSamplesWritten : c_ulong
the total number of sample, written after the buffer allocation
"""
_pack_ = 1
_fields_ = [('bufSizeBytes', c.c_ulong),
('dataType', c.c_uint),
('nChannels', c.c_ulong),
('nSamples', c.c_ulong)]
class BufferError(Exception):
"""Represents different types of buffer errors"""
def __init__(self, code):
"""Initializes a BufferError with given error code
Parameters
----------
code : int
error code
"""
self.code = code
def __str__(self):
"""Prints the error"""
if self.code == 1:
return 'buffer is not initialized (error %s)' % repr(self.code)
elif self.code in [2, 3]:
return 'unable to get indices (error %s)' % repr(self.code)
elif self.code == 4:
return 'writing incompatible data (error %s)' % repr(self.code)
elif self.code == 5:
return 'negative index (error %s)' % repr(self.code)
else:
return '(error %s)' % repr(self.code)
if __name__ == '__main__':
buf1 = Buffer()
buf2 = Buffer()
buf1.initialize(2, 15)
buf2.initialize_from_raw(buf1.raw)
buf1.put_data(np.array([[1, 2], [3, 4]]))
buf2.put_data(np.array([[5, 6], [7, 8]]), start=2)
print buf1
print buf2
dat = buf2.get_data(0, 4)
dat[0, 0] = 100
print buf1
print buf2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Sep 24, 2015 15:32
@author: <'Ronny Eichler'> ronny.eichler@gmail.com
Stream data to buffer
"""
from multiprocessing import Process, Queue
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on Sep 23, 2015 18:53
@author: <'Ronny Eichler'> ronny.eichler@gmail.com
File reader
"""
import numpy as np
import re
SIZE_HEADER = 1024 # size of header in B
NUM_SAMPLES = 1024 # number of samples per record
SIZE_RECORD = 2070 # total size of record (2x1024 B samples + record header)
REC_MARKER = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 255], dtype=np.uint8)
# data type of .continuous open ephys 0.2x file format header
HEADER_DT = np.dtype([('Header', 'S%d' % SIZE_HEADER)])
# (2048 + 22) Byte = 2070 Byte total
# FIXME: The rec_mark comes after the samples. Currently data is read assuming full NUM_SAMPLE record!
DATA_DT = np.dtype([('timestamp', np.int64), # 8 Byte
('n_samples', np.uint16), # 2 Byte
('rec_num', np.uint16), # 2 Byte
('samples', ('>i2', NUM_SAMPLES)), # 2 Byte each x 1024 typ.
('rec_mark', (np.uint8, 10))]) # 10 Byte
def read_header(filename):
"""Return dict with .continuous file header content."""
# TODO: Compare headers, should be identical except for channel
# 1 kiB header string data type
header = read_segment(filename, offset=0, count=1, dtype=HEADER_DT)
# Stand back! I know regex!
# Annoyingly, there is a newline character missing in the header (version/header_bytes)
regex = "header\.([\d\w\.\s]{1,}).=.\'*([^\;\']{1,})\'*"
header_str = str(header[0][0]).rstrip(' ')
header_dict = {group[0]: group[1] for group in re.compile(regex).findall(header_str)}
for key in ['bitVolts', 'sampleRate']:
header_dict[key] = float(header_dict[key])
for key in ['blockLength', 'bufferSize', 'header_bytes', 'channel']:
header_dict[key] = int(header_dict[key] if not key == 'channel' else header_dict[key][2:])
return header_dict
def read_segment(filename, offset, count, dtype):
"""Read segment of a file from [offset] for [count]x[dtype]"""
with open(filename, 'rb') as fid:
fid.seek(offset)
segment = np.fromfile(fid, dtype=dtype, count=count)
return segment
def read_record(filename, offset=0, count=30, dtype=DATA_DT):
return read_segment(filename, offset=SIZE_HEADER+offset*SIZE_RECORD, count=count, dtype=dtype)['samples']\
.ravel()\
.astype(np.float32)/2**10
# def reader(filename, buf):
# """
# Reader for a single .continuous file. Writes as many (complete, i.e. NUM_SAMPLES) records into given
# the buffer as can fit.
# :param filename: File name of the input .continuous file.
# :param buf: Designated column of a numpy array used for temporary storage/stacking of multiple channel data.
# :return: Dictionary of all headers read and stored in buffer.
# """
# # TODO: Allow sending new index to generator
# # TODO: Check record size for completion
# with open(filename, 'rb') as fid:
# yield np.fromfile(fid, HEADER_DT, 1)
# while True:
# data = np.fromfile(fid, DATA_DT, len(buf)/NUM_SAMPLES)
# buf[:len(data)*NUM_SAMPLES] = data['samples'].ravel()
# yield {idx: data[idx] for idx in data.dtype.names if idx != 'samples'} if len(data) else None
if __name__ == "__main__":
print read_header('data/2014-10-30_16-07-29/106_CH1.continuous')
print read_segment('data/2014-10-30_16-07-29/106_CH1.continuous', offset=SIZE_HEADER)['samples'].ravel()[0]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vispy: gallery 2
# Copyright (c) 2015, Vispy Development Team.
# Distributed under the (new) BSD License. See LICENSE.txt for more info.
"""
Multiple real-time digital signals with GLSL-based clipping.
"""
from vispy import gloo
from vispy import app
import numpy as np
import math
# Number of cols and rows in the table.
nrows = 16
ncols = 20
# Number of signals.
m = nrows*ncols
# Number of samples per signal.
n = 1000
# Various signal amplitudes.
amplitudes = .1 + .2 * np.random.rand(m, 1).astype(np.float32)
# Generate the signals as a (m, n) array.
y = amplitudes * np.random.randn(m, n).astype(np.float32)
# Color of each vertex (TODO: make it more efficient by using a GLSL-based
# color map and the index).
color = np.repeat(np.random.uniform(size=(m, 3), low=.5, high=.9),
n, axis=0).astype(np.float32)
# Signal 2D index of each vertex (row and col) and x-index (sample index
# within each signal).
index = np.c_[np.repeat(np.repeat(np.arange(ncols), nrows), n),
np.repeat(np.tile(np.arange(nrows), ncols), n),
np.tile(np.arange(n), m)].astype(np.float32)
VERT_SHADER = """
#version 120
// y coordinate of the position.
attribute float a_position;
// row, col, and time index.
attribute vec3 a_index;
varying vec3 v_index;
// 2D scaling factor (zooming).
uniform vec2 u_scale;
// Size of the table.
uniform vec2 u_size;
// Number of samples per signal.
uniform float u_n;
// Color.
attribute vec3 a_color;
varying vec4 v_color;
// Varying variables used for clipping in the fragment shader.
varying vec2 v_position;
varying vec4 v_ab;
void main() {
float nrows = u_size.x;
float ncols = u_size.y;
// Compute the x coordinate from the time index.
float x = -1 + 2*a_index.z / (u_n-1);
vec2 position = vec2(x - (1 - 1 / u_scale.x), a_position);
// Find the affine transformation for the subplots.
vec2 a = vec2(1./ncols, 1./nrows)*.9;
vec2 b = vec2(-1 + 2*(a_index.x+.5) / ncols,
-1 + 2*(a_index.y+.5) / nrows);
// Apply the static subplot transformation + scaling.
gl_Position = vec4(a*u_scale*position+b, 0.0, 1.0);
v_color = vec4(a_color, 1.);
v_index = a_index;
// For clipping test in the fragment shader.
v_position = gl_Position.xy;
v_ab = vec4(a, b);
}
"""
FRAG_SHADER = """
#version 120
varying vec4 v_color;
varying vec3 v_index;
varying vec2 v_position;
varying vec4 v_ab;
void main() {
gl_FragColor = v_color;
// Discard the fragments between the signals (emulate glMultiDrawArrays).
if ((fract(v_index.x) > 0.) || (fract(v_index.y) > 0.))
discard;
// Clipping test.
vec2 test = abs((v_position.xy-v_ab.zw)/v_ab.xy);
if ((test.x > 1) || (test.y > 1))
discard;
}
"""
class Canvas(app.Canvas):
def __init__(self):
app.Canvas.__init__(self, title='Use your wheel to zoom!',
keys='interactive')
self.program = gloo.Program(VERT_SHADER, FRAG_SHADER)
self.program['a_position'] = y.reshape(-1, 1)
self.program['a_color'] = color
self.program['a_index'] = index
self.program['u_scale'] = (1., 1.)
self.program['u_size'] = (nrows, ncols)
self.program['u_n'] = n
gloo.set_viewport(0, 0, *self.physical_size)
self._timer = app.Timer('auto', connect=self.on_timer, start=True)
gloo.set_state(clear_color='black', blend=True,
blend_func=('src_alpha', 'one_minus_src_alpha'))
self.show()
def on_resize(self, event):
gloo.set_viewport(0, 0, *event.physical_size)
def on_mouse_wheel(self, event):
dx = np.sign(event.delta[1]) * .05
scale_x, scale_y = self.program['u_scale']
scale_x_new, scale_y_new = (scale_x * math.exp(2.5*dx),
scale_y * math.exp(0.0*dx))
self.program['u_scale'] = (max(1, scale_x_new), max(1, scale_y_new))
self.update()
def on_timer(self, event):
"""Add some data at the end of each signal (real-time signals)."""
k = 10
y[:, :-k] = y[:, k:]
y[:, -k:] = amplitudes * np.random.randn(m, k)
self.program['a_position'].set_data(y.ravel().astype(np.float32))
self.update()
def on_draw(self, event):
gloo.clear()
self.program.draw('line_strip')
if __name__ == '__main__':
c = Canvas()
app.run()
#version 120
varying vec4 v_color;
varying vec3 v_index;
varying vec2 v_position;
varying vec4 v_ab;
void main() {
gl_FragColor = v_color;
// Discard the fragments between the signals (emulate glMultiDrawArrays).
if ((fract(v_index.x) > 0.) || (fract(v_index.y) > 0.))
discard;
// Clipping test.
vec2 test = abs((v_position.xy-v_ab.zw)/v_ab.xy);
// Complete clipping test: if ((test.x > 1) || (test.y > 1))
if (test.x > 1)
discard;
}
\ No newline at end of file
#version 120
// y coordinate of the position.
attribute float a_position;
// row, col, and time index.
attribute vec3 a_index;
varying vec3 v_index;
// 2D scaling factor (zooming).
uniform vec2 u_scale;
// Size of the table.
uniform vec2 u_size;
// Number of samples per signal.
uniform float u_n;
// Color.
attribute vec3 a_color;
varying vec4 v_color;
// Varying variables used for clipping in the fragment shader.
varying vec2 v_position;
varying vec4 v_ab;
void main() {
float nrows = u_size.x;
float ncols = u_size.y;
// Compute the x coordinate from the time index.
float x = -1 + 2*a_index.z / (u_n-1);
vec2 position = vec2(x - (1 - 1 / u_scale.x), a_position);
// Find the affine transformation for the subplots.
vec2 a = vec2(1./ncols, 1./nrows)*.96;