Commit d712192b authored by Ronny Eichler's avatar Ronny Eichler
Browse files

Slight project restructure and dataset format detection

parent be124ef2
......@@ -4,9 +4,10 @@
from __future__ import print_function
import sys
import cmd
import tools
import logging
from constants import LOG_LEVEL_VERBOSE
from lib.constants import LOG_LEVEL_VERBOSE
import lib.tools
from dataman_cli import DataMan
__version__ = 0.01
......@@ -25,9 +26,15 @@ if __name__ == "__main__":
subparsers = parser.add_subparsers(help='sub commands', dest='command')
parser_cli = subparsers.add_parser('cli', help='Interactive CLI session')
parser_stats = subparsers.add_parser('stats', help='Directory statistics')
parser_stats = subparsers.add_parser('stats', help='Dataset statistics.')
parser_stats.add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
parser.ls = subparsers.add_parser('ls', help='Directory listing with basic stats (e.g. size)')
parser.ls .add_argument('path', help='Relative or absolute path to directory',
default='.', nargs='?')
parser_proc = subparsers.add_parser('proc', help='Data processing')
parser_doc = subparsers.add_parser('doc', help='Data documentation')
parser_check = subparsers.add_parser('check', help='Check/verify data and documentation integrity')
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import logging
import tools
import cmd
from constants import LOG_LEVEL_VERBOSE
import lib.tools
from lib.constants import LOG_LEVEL_VERBOSE
class DataMan(cmd.Cmd):
"""Command line tool for quick data documentation."""
......@@ -27,11 +28,17 @@ class DataMan(cmd.Cmd):
else:
print("hi there!")
def do_ls(self, path):
if not len(path):
path = '.'
import lib.dirstats as ds
ds.print_table(ds.gather(path))
def do_stats(self, path):
if not len(path):
path = '.'
import folderstats as fs
fs.print_table(fs.gather(path))
import lib.dirstats as ds
ds.print_table(ds.gather(path))
def do_exit(self, line):
"Exit"
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import tools
from tools import fext, dir_content
import os
import sys
from termcolor import colored
import open_ephys
import kwik
EXT_VIDEO = ['.avi', '.mp4', '.mkv', '.wmv']
EXT_SOUND = ['.wav', '.mp3', '.snd', '.wma']
......@@ -16,54 +20,52 @@ table_hdr = "{0:^28}{sep}{1:^6}{sep}{2:>3}{sep}{3:>3}{sep}{4:>3}{sep}{5:>3}{sep}
_row = "{0:<28}{1}{2:>4}{3:>4}{4:>4}{5:>4}{6:>10}"
def check_format(*targets):
def contains_dataset(root, dirs=None, files=None):
"""Check if directory or list of files contains a dataset of known format (OE, Kwik, etc.)"""
if len(targets) == 1 and os.path.isdir(targets[0]):
root, dirs, files = next(os.walk(targets[0]))
else:
# for t in targets:
# TODO assert(os.path.exists(t))
files = targets
for f in files:
if fext(f) in ['.continuous']:
return "OpenEphys"
elif fext(f) in ['.kwx', '.kwd', '.kwik']:
return "Kwik"
if None in [dirs, files]:
_, dirs, files = dir_content(root)
formats = [open_ephys, kwik]
for fmt in formats:
detected = fmt.detect(root, dirs, files)
if detected:
return detected
else:
return None
def fext(fname):
return os.path.splitext(fname)[1]
def dir_details(path):
name = path
root, dirs, files = dir_content(path)
name = os.path.basename(path)
size = tools.dir_size(path)
root, dirs, files = next(os.walk(path))
num_files = len(files)
num_vid = len([f for f in files if fext(f) in EXT_VIDEO])
num_img= len([f for f in files if fext(f) in EXT_SOUND])
num_snd = len([f for f in files if fext(f) in EXT_IMAGE])
num_img= len([f for f in files if fext(f) in EXT_IMAGE])
num_snd = len([f for f in files if fext(f) in EXT_SOUND])
num_doc = len([f for f in files if fext(f) in EXT_DOC])
data_fmt = check_format(*files)
return dict(fname=name,
size=size,
num_files=num_files,
num_vid=num_vid,
num_img=num_img,
num_snd=num_snd,
num_doc=num_doc,
data_fmt = contains_dataset(path)
return dict(fname=name, size=size, num_files=num_files, num_vid=num_vid,
num_img=num_img, num_snd=num_snd, num_doc=num_doc,
data_fmt=data_fmt)
def gather(path):
#print("Gathering: ", path)
root, dirs, files = next(os.walk(path))
details = [dir_details(root)]
if check_format(root):
return details
else:
for d in dirs:
details.append(dir_details(os.path.join(root, d)))
"""Gather details on the path and its subdirectories.
Args:
path: Relative or absolute path to a directory.
Returns:
List of dictionaries. Each element in the list corresponds
to the details of a single directory (including the given as
[path]) in a dictionary.
"""
root, dirs, files = dir_content(path)
details = []
details.append(dir_details(root))
for d in dirs:
details.append(dir_details(os.path.join(root, d)))
return details
def prettify(element, color=None, align='>', width=0, sepl='', sepr=''):
......@@ -93,29 +95,31 @@ def mk_row(row, colorized=True, cols=['fname', 'size', 'num_files',
row_str += prettify(tools.fmt_size(row[c], unit='', sep='', col=True, pad=7),
sepr=sepr, align='>', width='')
elif c == 'num_files':
row_str += prettify(row[c],
color='red' if row[c]==0 and colored else None,
sepr=sepr, align='>', width=4)
elif c == 'num_vid':
row_str += prettify(row[c],
color='green' if row[c]>0 and colored else None,
sepr=sepr, align='>', width=4)
elif c == 'num_img':
row_str += prettify(row[c],
color='green' if row[c]>0 and colored else None,
sepr=sepr, align='>', width=4)
elif c == 'num_snd':
row_str += prettify(row[c],
color='green' if row[c]>0 and colored else None,
sepr=sepr, align='>', width=4)
elif c == 'data_fmt':
if row[c] == 'OpenEphys':
color = 'yellow'
elif row[c] == 'Kwik':
color = 'green'
elif c in ['num_vid', 'num_img', 'num_snd', 'num_doc']:
if row[c] > 0:
color='green' if colored else None
val = row[c]
else:
val = ''
color = None
row_str += prettify(val, color=color, sepr=sepr, align='>', width=4)
elif c == 'data_fmt':
if row[c] is None:
color = None
else:
if 'OE' in row[c]:
color = 'yellow'
elif 'Kw' in row[c]:
color = 'green'
else:
color = None
row_str += prettify(row[c] if row[c] is not None else '',
color=color if colored else None,
sepr=sepr, align='>', width=10)
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import xml.etree.ElementTree as etree
from tools import fext, dir_content
def detect(root, dirs=None, files=None):
for f in files:
if fext(f) in ['.kwx', '.kwd', '.kwik']:
fv = format_version(root, dirs, files)
return "Kw_v{}".format(fv if fv else '???')
else:
return False
def format_version(root, dirs=None, files=None):
if dirs is None or files is None:
_, dirs, files = dir_content(root)
if "settings.xml" in files:
root = etree.parse(os.path.join(root, 'settings.xml'))
version = root.findall("INFO/VERSION")
if not len(version):
return None
else:
return version[0].text
else:
return None
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import xml.etree.ElementTree as etree
from tools import fext, dir_content
def detect(root=None, dirs=None, files=None):
"""Checks for existence of an open ephys formatted data set in the root directory.
Args:
root: Directory to search in.
dirs: list of subdirectories in root. Will be scanned if not provided.
files: List of files in the root directory. Will be scanned if not provided.
Returns:
None if no data set found, else a string with data set format name and version.
"""
# TODO: Make all three optional and work with either
if dirs is None or files is None:
_, dirs, files = dir_content(root)
for f in files:
if fext(f) in ['.continuous']:
fv = format_version(root, dirs, files)
return "OE_v{}".format(fv if fv else '???')
else:
return False
def format_version(root, dirs=None, files=None):
if dirs is None or files is None:
_, dirs, files = dir_content(root)
if "settings.xml" in files:
root = etree.parse(os.path.join(root, 'settings.xml'))
version = root.findall("INFO/VERSION")
if not len(version):
return None
else:
return version[0].text
else:
return None
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
from os.path import join, getsize
from termcolor import colored
......@@ -23,10 +25,43 @@ def fmt_size(num, unit='B', si=True, sep=' ', col=False, pad=0):
return "{:5.0f}{}{}{} ".format(num, sep, prefix, unit, pad=pad-6)
num /= divisor
def directory_content(path):
def fext(fname):
"""Grabs the file extension of a file.
Args:
fname: File name.
Returns:
String with file extension. Empty string, if file has no extensions.
Raises:
IOError if file does not exist or can not be accessed.
"""
return os.path.splitext(fname)[1]
def dir_content(path):
"""Gathers root and first level content of a directory.
Args:
path: Relative or absolute path to a directory.
Returns:
A tuple containing the root directory, the directories and the files
contained in the root directory.
(dirpath, dirnames, filenames)
"""
return next(os.walk(path))
def dir_size(path):
"""Calculate size of directory including all subdirectories and files
Args:
path: Relative or absolute path.
Returns:
Integer value of size in Bytes.
"""
total_size = 0
for root, dirs, files in os.walk(path):
for f in files:
......@@ -34,26 +69,28 @@ def dir_size(path):
try:
total_size += os.path.getsize(fp)
except OSError:
# symbolic links cause issues
pass
return total_size
def stats(path):
print "Got path:", path
root, dirs, files = directory_content(path)
print root, "consumes",
print format_filesize(sum(getsize(join(root, name)) for name in files)),
print "in", len(files), "non-directory files"
print "Directories:\n"
for d in dirs:
print d, fmt_size(dir_size(d))
print "Files:\n", files
def terminal_size():
"""Returns tuple of height, width of terminal window.
In many cases this is inaccruate."""
"""Get size of currently used terminal. In many cases this is inaccruate.
Returns:
Tuple of width, height.
Raises:
Unknown error when not run from a terminal.
"""
return map(int, os.popen('stty size', 'r').read().split())
def _find_getch():
"""Helper to wait for a single character press, instead of having to use raw_input() requiring Enter
to be pressed. Should work on all OS.
Returns:
Function that works as blocking single character input without prompt.
"""
try:
import termios
except ImportError:
......@@ -77,9 +114,15 @@ def _find_getch():
ansi_escape = re.compile(r'\x1b[^m]*m')
def strip_ansi(string):
"""Remove the ANSI codes from a string"""
"""Remove the ANSI codes (e.g. color and additional formatting) from a string.
Args:
string: A string potentially containing ANSI escape codes.
Returns:
String with ANSI escape codes removed.
"""
return ansi_escape.sub('', string)
if __name__ == "__main__":
stats('.')
print fmt_size(dir_size("."))
pass
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment