Merge pull request #1 from echemdata/master

get the latest
This commit is contained in:
Tim
2020-10-25 05:48:16 -07:00
committed by GitHub
12 changed files with 553 additions and 348 deletions

2
.flake8 Normal file
View File

@@ -0,0 +1,2 @@
# This file will be ignored - see http://flake8.pycqa.org/en/2.6.0/config.html#per-project
# Edit the [flake8] section in tox.ini instead

View File

@@ -3,10 +3,13 @@ language: python
cache: cache:
directories: directories:
- .tox - .tox
- .pytest_cache
- tests/testdata - tests/testdata
python: python:
- "2.7"
- "3.5" - "3.5"
- "3.6"
- "3.7"
- "3.8"
install: install:
- pip install tox-travis - pip install tox-travis
- sh get_testdata.sh - sh get_testdata.sh

View File

@@ -7,6 +7,14 @@ Read proprietary file formats from electrochemical test stations
Use the `MPRfile` class from BioLogic.py (exported in the main package) Use the `MPRfile` class from BioLogic.py (exported in the main package)
````
from galvani import BioLogic
import pandas as pd
mpr_file = BioLogic.MPRfile('test.mpr')
df = pd.DataFrame(mpr_file.data)
````
## Arbin .res files ## ## Arbin .res files ##
Use the res2sqlite.py script to convert the .res file to a sqlite3 database Use the res2sqlite.py script to convert the .res file to a sqlite3 database

View File

@@ -3,25 +3,16 @@
__all__ = ['MPTfileCSV', 'MPTfile'] __all__ = ['MPTfileCSV', 'MPTfile']
import sys
import re import re
import csv import csv
from os import SEEK_SET from os import SEEK_SET
import time import time
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from collections import OrderedDict from collections import defaultdict, OrderedDict
import numpy as np import numpy as np
if sys.version_info.major <= 2:
str3 = str
from string import maketrans
else:
str3 = lambda b: str(b, encoding='ascii')
maketrans = bytes.maketrans
def fieldname_to_dtype(fieldname): def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of """Converts a column header from the MPT file into a tuple of
canonical name and appropriate numpy dtype""" canonical name and appropriate numpy dtype"""
@@ -48,13 +39,13 @@ def fieldname_to_dtype(fieldname):
raise ValueError("Invalid column header: %s" % fieldname) raise ValueError("Invalid column header: %s" % fieldname)
def comma_converter(float_string): def comma_converter(float_text):
"""Convert numbers to floats whether the decimal point is '.' or ','""" """Convert text to float whether the decimal point is '.' or ','"""
trans_table = maketrans(b',', b'.') trans_table = bytes.maketrans(b',', b'.')
return float(float_string.translate(trans_table)) return float(float_text.translate(trans_table))
def MPTfile(file_or_path): def MPTfile(file_or_path, encoding='ascii'):
"""Opens .mpt files as numpy record arrays """Opens .mpt files as numpy record arrays
Checks for the correct headings, skips any comments and returns a Checks for the correct headings, skips any comments and returns a
@@ -70,19 +61,20 @@ def MPTfile(file_or_path):
if magic != b'EC-Lab ASCII FILE\r\n': if magic != b'EC-Lab ASCII FILE\r\n':
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match(b'Nb header lines : (\d+)\s*$', next(mpt_file)) nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$',
next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
## The 'magic number' line, the 'Nb headers' line and the column headers # The 'magic number' line, the 'Nb headers' line and the column headers
## make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
fieldnames = str3(next(mpt_file)).strip().split('\t') fieldnames = next(mpt_file).decode(encoding).strip().split('\t')
record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames))) record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames)))
## Must be able to parse files where commas are used for decimal points # Must be able to parse files where commas are used for decimal points
converter_dict = dict(((i, comma_converter) converter_dict = dict(((i, comma_converter)
for i in range(len(fieldnames)))) for i in range(len(fieldnames))))
mpt_array = np.loadtxt(mpt_file, dtype=record_type, mpt_array = np.loadtxt(mpt_file, dtype=record_type,
@@ -107,13 +99,13 @@ def MPTfileCSV(file_or_path):
if magic.rstrip() != 'EC-Lab ASCII FILE': if magic.rstrip() != 'EC-Lab ASCII FILE':
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match('Nb header lines : (\d+)\s*$', next(mpt_file)) nb_headers_match = re.match(r'Nb header lines : (\d+)\s*$', next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
## The 'magic number' line, the 'Nb headers' line and the column headers # The 'magic number' line, the 'Nb headers' line and the column headers
## make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab') mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab')
@@ -143,86 +135,123 @@ VMPmodule_hdr = np.dtype([('shortname', 'S10'),
('version', '<u4'), ('version', '<u4'),
('date', 'S8')]) ('date', 'S8')])
# Maps from colID to a tuple defining a numpy dtype
VMPdata_colID_dtype_map = {
4: ('time/s', '<f8'),
5: ('control/V/mA', '<f4'),
6: ('Ewe/V', '<f4'),
7: ('dQ/mA.h', '<f8'),
8: ('I/mA', '<f4'), # 8 is either I or <I> ??
9: ('Ece/V', '<f4'),
11: ('I/mA', '<f8'),
13: ('(Q-Qo)/mA.h', '<f8'),
19: ('control/V', '<f4'),
20: ('control/mA', '<f4'),
23: ('dQ/mA.h', '<f8'), # Same as 7?
24: ('cycle number', '<f8'),
32: ('freq/Hz', '<f4'),
33: ('|Ewe|/V', '<f4'),
34: ('|I|/A', '<f4'),
35: ('Phase(Z)/deg', '<f4'),
36: ('|Z|/Ohm', '<f4'),
37: ('Re(Z)/Ohm', '<f4'),
38: ('-Im(Z)/Ohm', '<f4'),
39: ('I Range', '<u2'),
70: ('P/W', '<f4'),
74: ('Energy/W.h', '<f8'),
76: ('<I>/mA', '<f4'),
77: ('<Ewe>/V', '<f4'),
96: ('|Ece|/V', '<f4'),
98: ('Phase(Zce)/deg', '<f4'),
99: ('|Zce|/Ohm', '<f4'),
100: ('Re(Zce)/Ohm', '<f4'),
101: ('-Im(Zce)/Ohm', '<f4'),
123: ('Energy charge/W.h', '<f8'),
124: ('Energy discharge/W.h', '<f8'),
125: ('Capacitance charge/µF', '<f8'),
126: ('Capacitance discharge/µF', '<f8'),
131: ('Ns', '<u2'),
168: ('Rcmp/Ohm', '<f4'),
169: ('Cs/µF', '<f4'),
172: ('Cp/µF', '<f4'),
430: ('Phase(Zwe-ce)/deg', '<f4'),
431: ('|Zwe-ce|/Ohm', '<f4'),
432: ('Re(Zwe-ce)/Ohm', '<f4'),
433: ('-Im(Zwe-ce)/Ohm', '<f4'),
434: ('(Q-Qo)/C', '<f4'),
435: ('dQ/C', '<f4'),
441: ('<Ecv>/V', '<f4'),
462: ('Temperature/°C', '<f4'),
467: ('Q charge/discharge/mA.h', '<f8'),
468: ('half cycle', '<u4'),
469: ('z cycle', '<u4'),
471: ('<Ece>/V', '<f4'),
473: ('THD Ewe/%', '<f4'),
474: ('THD I/%', '<f4'),
476: ('NSD Ewe/%', '<f4'),
477: ('NSD I/%', '<f4'),
479: ('NSR Ewe/%', '<f4'),
480: ('NSR I/%', '<f4'),
}
# These column IDs define flags which are all stored packed in a single byte
# The values in the map are (name, bitmask, dtype)
VMPdata_colID_flag_map = {
1: ('mode', 0x03, np.uint8),
2: ('ox/red', 0x04, np.bool_),
3: ('error', 0x08, np.bool_),
21: ('control changes', 0x10, np.bool_),
31: ('Ns changes', 0x20, np.bool_),
65: ('counter inc.', 0x80, np.bool_),
}
def VMPdata_dtype_from_colIDs(colIDs): def VMPdata_dtype_from_colIDs(colIDs):
dtype_dict = OrderedDict() """Get a numpy record type from a list of column ID numbers.
The binary layout of the data in the MPR file is described by the sequence
of column ID numbers in the file header. This function converts that
sequence into a numpy dtype which can then be used to load data from the
file with np.frombuffer().
Some column IDs refer to small values which are packed into a single byte.
The second return value is a dict describing the bit masks with which to
extract these columns from the flags byte.
"""
type_list = []
field_name_counts = defaultdict(int)
flags_dict = OrderedDict() flags_dict = OrderedDict()
flags2_dict = OrderedDict()
for colID in colIDs: for colID in colIDs:
if colID in (1, 2, 3, 21, 31, 65): if colID in VMPdata_colID_flag_map:
dtype_dict['flags'] = 'u1' # Some column IDs represent boolean flags or small integers
if colID == 1: # These are all packed into a single 'flags' byte whose position
flags_dict['mode'] = (np.uint8(0x03), np.uint8) # in the overall record is determined by the position of the first
elif colID == 2: # column ID of flag type. If there are several flags present,
flags_dict['ox/red'] = (np.uint8(0x04), np.bool_) # there is still only one 'flags' int
elif colID == 3: if 'flags' not in field_name_counts:
flags_dict['error'] = (np.uint8(0x08), np.bool_) type_list.append(('flags', 'u1'))
elif colID == 21: field_name_counts['flags'] = 1
flags_dict['control changes'] = (np.uint8(0x10), np.bool_) flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID]
elif colID == 31: # TODO what happens if a flag colID has already been seen
flags_dict['Ns changes'] = (np.uint8(0x20), np.bool_) # i.e. if flag_name is already present in flags_dict?
elif colID == 65: # Does it create a second 'flags' byte in the record?
flags_dict['counter inc.'] = (np.uint8(0x80), np.bool_) flags_dict[flag_name] = (np.uint8(flag_mask), flag_type)
elif colID in VMPdata_colID_dtype_map:
field_name, field_type = VMPdata_colID_dtype_map[colID]
field_name_counts[field_name] += 1
count = field_name_counts[field_name]
if count > 1:
unique_field_name = '%s %d' % (field_name, count)
else: else:
raise NotImplementedError("flag %d not implemented" % colID) unique_field_name = field_name
elif colID == 4: type_list.append((unique_field_name, field_type))
dtype_dict['time/s'] = '<f8'
elif colID == 5:
dtype_dict['control/V/mA'] = '<f4'
# 6 is Ewe, 77 is <Ewe>, I don't see the difference
elif colID in (6, 77):
dtype_dict['Ewe/V'] = '<f4'
# Can't see any difference between 7 and 23
elif colID in (7, 23):
dtype_dict['dQ/mA.h'] = '<f8'
# 76 is <I>, 8 is either I or <I> ??
elif colID in (8, 76):
dtype_dict['I/mA'] = '<f4'
elif colID == 9:
dtype_dict['Ece/V'] = '<f4'
elif colID == 11:
dtype_dict['I/mA'] = '<f8'
elif colID == 13:
dtype_dict['(Q-Qo)/mA.h'] = '<f8'
elif colID == 19:
dtype_dict['control/V'] = '<f4'
elif colID == 24:
dtype_dict['cycle number'] = '<f8'
elif colID == 32:
dtype_dict['freq/Hz'] = '<f4'
elif colID == 33:
dtype_dict['|Ewe|/V'] = '<f4'
elif colID == 34:
dtype_dict['|I|/A'] = '<f4'
elif colID == 35:
dtype_dict['Phase(Z)/deg'] = '<f4'
elif colID == 36:
dtype_dict['|Z|/Ohm'] = '<f4'
elif colID == 37:
dtype_dict['Re(Z)/Ohm'] = '<f4'
elif colID == 38:
dtype_dict['-Im(Z)/Ohm'] = '<f4'
elif colID == 39:
dtype_dict['I Range'] = '<u2'
elif colID == 70:
dtype_dict['P/W'] = '<f4'
elif colID == 125:
dtype_dict['Capacitance charge/µF'] = '<f8'
elif colID == 126:
dtype_dict['Capacitance discharge/µF'] = '<f8'
elif colID == 131:
dtype_dict['Ns'] = '<u2'
elif colID == 434:
dtype_dict['(Q-Qo)/C'] = '<f4'
elif colID == 435:
dtype_dict['dQ/C'] = '<f4'
elif colID == 467:
dtype_dict['Q charge/discharge/mA.h'] = '<f8'
elif colID == 468:
dtype_dict['half cycle'] = '<u4'
else: else:
raise NotImplementedError("column type %d not implemented" % colID) raise NotImplementedError("Column ID {cid} after column {prev} "
return np.dtype(list(dtype_dict.items())), flags_dict, flags2_dict "is unknown"
.format(cid=colID,
prev=type_list[-1][0]))
return np.dtype(type_list), flags_dict
def read_VMP_modules(fileobj, read_module_data=True): def read_VMP_modules(fileobj, read_module_data=True):
@@ -234,15 +263,16 @@ def read_VMP_modules(fileobj, read_module_data=True):
while True: while True:
module_magic = fileobj.read(len(b'MODULE')) module_magic = fileobj.read(len(b'MODULE'))
if len(module_magic) == 0: # end of file if len(module_magic) == 0: # end of file
raise StopIteration break
elif module_magic != b'MODULE': elif module_magic != b'MODULE':
raise ValueError("Found %r, expecting start of new VMP MODULE" % module_magic) raise ValueError("Found %r, expecting start of new VMP MODULE"
% module_magic)
hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize)
if len(hdr_bytes) < VMPmodule_hdr.itemsize: if len(hdr_bytes) < VMPmodule_hdr.itemsize:
raise IOError("Unexpected end of file while reading module header") raise IOError("Unexpected end of file while reading module header")
hdr = np.fromstring(hdr_bytes, dtype=VMPmodule_hdr, count=1) hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1)
hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names)) hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names))
hdr_dict['offset'] = fileobj.tell() hdr_dict['offset'] = fileobj.tell()
if read_module_data: if read_module_data:
@@ -260,6 +290,9 @@ def read_VMP_modules(fileobj, read_module_data=True):
fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET) fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET)
MPR_MAGIC = b'BIO-LOGIC MODULAR FILE\x1a'.ljust(48) + b'\x00\x00\x00\x00'
class MPRfile: class MPRfile:
"""Bio-Logic .mpr file """Bio-Logic .mpr file
@@ -277,79 +310,101 @@ class MPRfile:
""" """
def __init__(self, file_or_path): def __init__(self, file_or_path):
self.loop_index = None
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpr_file = open(file_or_path, 'rb') mpr_file = open(file_or_path, 'rb')
else: else:
mpr_file = file_or_path mpr_file = file_or_path
magic = mpr_file.read(len(MPR_MAGIC))
mpr_magic = b'BIO-LOGIC MODULAR FILE\x1a \x00\x00\x00\x00' if magic != MPR_MAGIC:
magic = mpr_file.read(len(mpr_magic))
if magic != mpr_magic:
raise ValueError('Invalid magic for .mpr file: %s' % magic) raise ValueError('Invalid magic for .mpr file: %s' % magic)
modules = list(read_VMP_modules(mpr_file)) modules = list(read_VMP_modules(mpr_file))
self.modules = modules self.modules = modules
settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ') settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ')
data_module, = (m for m in modules if m['shortname'] == b'VMP data ') data_module, = (m for m in modules if m['shortname'] == b'VMP data ')
maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop ']
maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG '] maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG ']
n_data_points = np.fromstring(data_module['data'][:4], dtype='<u4') n_data_points = np.frombuffer(data_module['data'][:4], dtype='<u4')
n_columns = np.fromstring(data_module['data'][4:5], dtype='u1') n_columns = np.frombuffer(data_module['data'][4:5], dtype='u1').item()
n_columns = np.asscalar(n_columns) # Compatibility with recent numpy
if data_module['version'] == 0: if data_module['version'] == 0:
column_types = np.fromstring(data_module['data'][5:], dtype='u1', column_types = np.frombuffer(data_module['data'][5:], dtype='u1',
count=n_columns) count=n_columns)
remaining_headers = data_module['data'][5 + n_columns:100] remaining_headers = data_module['data'][5 + n_columns:100]
main_data = data_module['data'][100:] main_data = data_module['data'][100:]
elif data_module['version'] == 2: elif data_module['version'] in [2, 3]:
column_types = np.fromstring(data_module['data'][5:], dtype='<u2', column_types = np.frombuffer(data_module['data'][5:], dtype='<u2',
count=n_columns) count=n_columns)
## There is 405 bytes of data before the main array starts # There are bytes of data before the main array starts
if data_module['version'] == 3:
num_bytes_before = 406 # version 3 added `\x01` to the start
else:
num_bytes_before = 405
remaining_headers = data_module['data'][5 + 2 * n_columns:405] remaining_headers = data_module['data'][5 + 2 * n_columns:405]
main_data = data_module['data'][405:] main_data = data_module['data'][num_bytes_before:]
else: else:
raise ValueError("Unrecognised version for data module: %d" % raise ValueError("Unrecognised version for data module: %d" %
data_module['version']) data_module['version'])
if sys.version_info.major <= 2:
assert(all((b == '\x00' for b in remaining_headers)))
else:
assert(not any(remaining_headers)) assert(not any(remaining_headers))
self.dtype, self.flags_dict, self.flags2_dict = VMPdata_dtype_from_colIDs(column_types) self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
self.data = np.fromstring(main_data, dtype=self.dtype) self.data = np.frombuffer(main_data, dtype=self.dtype)
assert(self.data.shape[0] == n_data_points) assert(self.data.shape[0] == n_data_points)
## No idea what these 'column types' mean or even if they are actually # No idea what these 'column types' mean or even if they are actually
## column types at all # column types at all
self.version = int(data_module['version']) self.version = int(data_module['version'])
self.cols = column_types self.cols = column_types
self.npts = n_data_points self.npts = n_data_points
tm = time.strptime(str3(settings_mod['date']), '%m/%d/%y') try:
tm = time.strptime(settings_mod['date'].decode('ascii'), '%m/%d/%y')
except ValueError:
tm = time.strptime(settings_mod['date'].decode('ascii'), '%m-%d-%y')
self.startdate = date(tm.tm_year, tm.tm_mon, tm.tm_mday) self.startdate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
if maybe_loop_module:
loop_module, = maybe_loop_module
if loop_module['version'] == 0:
self.loop_index = np.fromstring(loop_module['data'][4:],
dtype='<u4')
self.loop_index = np.trim_zeros(self.loop_index, 'b')
else:
raise ValueError("Unrecognised version for data module: %d" %
data_module['version'])
if maybe_log_module: if maybe_log_module:
log_module, = maybe_log_module log_module, = maybe_log_module
tm = time.strptime(str3(log_module['date']), '%m/%d/%y') try:
tm = time.strptime(log_module['date'].decode('ascii'), '%m/%d/%y')
except ValueError:
tm = time.strptime(log_module['date'].decode('ascii'), '%m-%d-%y')
self.enddate = date(tm.tm_year, tm.tm_mon, tm.tm_mday) self.enddate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
## There is a timestamp at either 465 or 469 bytes # There is a timestamp at either 465 or 469 bytes
## I can't find any reason why it is one or the other in any # I can't find any reason why it is one or the other in any
## given file # given file
ole_timestamp1 = np.fromstring(log_module['data'][465:], ole_timestamp1 = np.frombuffer(log_module['data'][465:],
dtype='<f8', count=1) dtype='<f8', count=1)
ole_timestamp2 = np.fromstring(log_module['data'][469:], ole_timestamp2 = np.frombuffer(log_module['data'][469:],
dtype='<f8', count=1) dtype='<f8', count=1)
ole_timestamp3 = np.fromstring(log_module['data'][473:], ole_timestamp3 = np.frombuffer(log_module['data'][473:],
dtype='<f8', count=1) dtype='<f8', count=1)
ole_timestamp4 = np.frombuffer(log_module['data'][585:],
dtype='<f8', count=1)
if ole_timestamp1 > 40000 and ole_timestamp1 < 50000: if ole_timestamp1 > 40000 and ole_timestamp1 < 50000:
ole_timestamp = ole_timestamp1 ole_timestamp = ole_timestamp1
elif ole_timestamp2 > 40000 and ole_timestamp2 < 50000: elif ole_timestamp2 > 40000 and ole_timestamp2 < 50000:
ole_timestamp = ole_timestamp2 ole_timestamp = ole_timestamp2
elif ole_timestamp3 > 40000 and ole_timestamp3 < 50000: elif ole_timestamp3 > 40000 and ole_timestamp3 < 50000:
ole_timestamp = ole_timestamp3 ole_timestamp = ole_timestamp3
elif ole_timestamp4 > 40000 and ole_timestamp4 < 50000:
ole_timestamp = ole_timestamp4
else: else:
raise ValueError("Could not find timestamp in the LOG module") raise ValueError("Could not find timestamp in the LOG module")
@@ -357,17 +412,14 @@ class MPRfile:
ole_timedelta = timedelta(days=ole_timestamp[0]) ole_timedelta = timedelta(days=ole_timestamp[0])
self.timestamp = ole_base + ole_timedelta self.timestamp = ole_base + ole_timedelta
if self.startdate != self.timestamp.date(): if self.startdate != self.timestamp.date():
raise ValueError("""Date mismatch: raise ValueError("Date mismatch:\n"
Start date: %s + " Start date: %s\n" % self.startdate
End date: %s + " End date: %s\n" % self.enddate
Timestamp: %s""" % (self.startdate, self.enddate, self.timestamp)) + " Timestamp: %s\n" % self.timestamp)
def get_flag(self, flagname): def get_flag(self, flagname):
if flagname in self.flags_dict: if flagname in self.flags_dict:
mask, dtype = self.flags_dict[flagname] mask, dtype = self.flags_dict[flagname]
return np.array(self.data['flags'] & mask, dtype=dtype) return np.array(self.data['flags'] & mask, dtype=dtype)
elif flagname in self.flags2_dict:
mask, dtype = self.flags2_dict[flagname]
return np.array(self.data['flags2'] & mask, dtype=dtype)
else: else:
raise AttributeError("Flag '%s' not present" % flagname) raise AttributeError("Flag '%s' not present" % flagname)

View File

@@ -1 +1,3 @@
from .BioLogic import MPTfile, MPRfile from .BioLogic import MPRfile, MPTfile
__all__ = ['MPRfile', 'MPTfile']

View File

@@ -7,8 +7,8 @@ import csv
import argparse import argparse
## The following scripts are adapted from the result of running # The following scripts are adapted from the result of running
## $ mdb-schema <result.res> oracle # $ mdb-schema <result.res> oracle
mdb_tables = ["Version_Table", "Global_Table", "Resume_Table", mdb_tables = ["Version_Table", "Global_Table", "Resume_Table",
"Channel_Normal_Table", "Channel_Statistic_Table", "Channel_Normal_Table", "Channel_Statistic_Table",
@@ -126,7 +126,8 @@ CREATE TABLE Channel_Statistic_Table
-- Version 1.14 ends here, version 5.23 continues -- Version 1.14 ends here, version 5.23 continues
Charge_Time REAL DEFAULT NULL, Charge_Time REAL DEFAULT NULL,
Discharge_Time REAL DEFAULT NULL, Discharge_Time REAL DEFAULT NULL,
FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
); """, ); """,
"Auxiliary_Table": """ "Auxiliary_Table": """
CREATE TABLE Auxiliary_Table CREATE TABLE Auxiliary_Table
@@ -137,7 +138,8 @@ CREATE TABLE Auxiliary_Table
Data_Type INTEGER, Data_Type INTEGER,
X REAL, X REAL,
"dX/dt" REAL, "dX/dt" REAL,
FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
); """, ); """,
"Event_Table": """ "Event_Table": """
CREATE TABLE Event_Table CREATE TABLE Event_Table
@@ -220,9 +222,10 @@ CREATE TABLE Smart_Battery_Data_Table
ChargingCurrent REAL DEFAULT NULL, ChargingCurrent REAL DEFAULT NULL,
ChargingVoltage REAL DEFAULT NULL, ChargingVoltage REAL DEFAULT NULL,
ManufacturerData REAL DEFAULT NULL, ManufacturerData REAL DEFAULT NULL,
FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
); """, ); """,
## The following tables are not present in version 1.14 # The following tables are not present in version 1.14
'MCell_Aci_Data_Table': """ 'MCell_Aci_Data_Table': """
CREATE TABLE MCell_Aci_Data_Table CREATE TABLE MCell_Aci_Data_Table
( (
@@ -233,7 +236,8 @@ CREATE TABLE MCell_Aci_Data_Table
Phase_Shift REAL, Phase_Shift REAL,
Voltage REAL, Voltage REAL,
Current REAL, Current REAL,
FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);""", );""",
'Aux_Global_Data_Table': """ 'Aux_Global_Data_Table': """
CREATE TABLE Aux_Global_Data_Table CREATE TABLE Aux_Global_Data_Table
@@ -288,7 +292,8 @@ CREATE TABLE Smart_Battery_Clock_Stretch_Table
VCELL3 INTEGER, VCELL3 INTEGER,
VCELL2 INTEGER, VCELL2 INTEGER,
VCELL1 INTEGER, VCELL1 INTEGER,
FOREIGN KEY (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);"""} );"""}
mdb_create_indices = { mdb_create_indices = {
@@ -306,11 +311,14 @@ CREATE TEMPORARY TABLE capacity_helper(
Discharge_Capacity REAL NOT NULL, Discharge_Capacity REAL NOT NULL,
Charge_Energy REAL NOT NULL, Charge_Energy REAL NOT NULL,
Discharge_Energy REAL NOT NULL, Discharge_Energy REAL NOT NULL,
FOREIGN KEY (Test_ID, Cycle_Index) REFERENCES Channel_Normal_Table (Test_ID, Cycle_Index) FOREIGN KEY (Test_ID, Cycle_Index)
REFERENCES Channel_Normal_Table (Test_ID, Cycle_Index)
); );
INSERT INTO capacity_helper INSERT INTO capacity_helper
SELECT Test_ID, Cycle_Index, max(Charge_Capacity), max(Discharge_Capacity), max(Charge_Energy), max(Discharge_Energy) SELECT Test_ID, Cycle_Index,
max(Charge_Capacity), max(Discharge_Capacity),
max(Charge_Energy), max(Discharge_Energy)
FROM Channel_Normal_Table FROM Channel_Normal_Table
GROUP BY Test_ID, Cycle_Index; GROUP BY Test_ID, Cycle_Index;
@@ -328,11 +336,14 @@ CREATE TABLE Capacity_Sum_Table(
Discharge_Capacity_Sum REAL NOT NULL, Discharge_Capacity_Sum REAL NOT NULL,
Charge_Energy_Sum REAL NOT NULL, Charge_Energy_Sum REAL NOT NULL,
Discharge_Energy_Sum REAL NOT NULL, Discharge_Energy_Sum REAL NOT NULL,
FOREIGN KEY (Test_ID, Cycle_Index) REFERENCES Channel_Normal_Table (Test_ID, Cycle_Index) FOREIGN KEY (Test_ID, Cycle_Index)
REFERENCES Channel_Normal_Table (Test_ID, Cycle_Index)
); );
INSERT INTO Capacity_Sum_Table INSERT INTO Capacity_Sum_Table
SELECT a.Test_ID, a.Cycle_Index, total(b.Charge_Capacity), total(b.Discharge_Capacity), total(b.Charge_Energy), total(b.Discharge_Energy) SELECT a.Test_ID, a.Cycle_Index,
total(b.Charge_Capacity), total(b.Discharge_Capacity),
total(b.Charge_Energy), total(b.Discharge_Energy)
FROM capacity_helper AS a LEFT JOIN capacity_helper AS b FROM capacity_helper AS a LEFT JOIN capacity_helper AS b
ON (a.Test_ID = b.Test_ID AND a.Cycle_Index > b.Cycle_Index) ON (a.Test_ID = b.Test_ID AND a.Cycle_Index > b.Cycle_Index)
GROUP BY a.Test_ID, a.Cycle_Index; GROUP BY a.Test_ID, a.Cycle_Index;
@@ -342,90 +353,113 @@ DROP TABLE capacity_helper;
CREATE VIEW IF NOT EXISTS Capacity_View CREATE VIEW IF NOT EXISTS Capacity_View
AS SELECT Test_ID, Data_Point, Test_Time, Step_Time, DateTime, AS SELECT Test_ID, Data_Point, Test_Time, Step_Time, DateTime,
Step_Index, Cycle_Index, Current, Voltage, "dV/dt", Step_Index, Cycle_Index, Current, Voltage, "dV/dt",
Discharge_Capacity + Discharge_Capacity_Sum - Charge_Capacity - Charge_Capacity_Sum AS Net_Capacity, ( (Discharge_Capacity + Discharge_Capacity_Sum)
Discharge_Capacity + Discharge_Capacity_Sum + Charge_Capacity + Charge_Capacity_Sum AS Gross_Capacity, - (Charge_Capacity + Charge_Capacity_Sum) ) AS Net_Capacity,
Discharge_Energy + Discharge_Energy_Sum - Charge_Energy - Charge_Energy_Sum AS Net_Energy, ( (Discharge_Capacity + Discharge_Capacity_Sum)
Discharge_Energy + Discharge_Energy_Sum + Charge_Energy + Charge_Energy_Sum AS Gross_Energy + (Charge_Capacity + Charge_Capacity_Sum) ) AS Gross_Capacity,
( (Discharge_Energy + Discharge_Energy_Sum)
- (Charge_Energy + Charge_Energy_Sum) ) AS Net_Energy,
( (Discharge_Energy + Discharge_Energy_Sum)
+ (Charge_Energy + Charge_Energy_Sum) ) AS Gross_Energy
FROM Channel_Normal_Table NATURAL JOIN Capacity_Sum_Table; FROM Channel_Normal_Table NATURAL JOIN Capacity_Sum_Table;
""" """
def mdb_get_data_text(filename, table): def mdb_get_data_text(s3db, filename, table):
print("Reading %s..." % table) print("Reading %s..." % table)
insert_pattern = re.compile(
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n',
re.IGNORECASE
)
try: try:
mdb_sql = sp.Popen(['mdb-export', '-I', 'postgres', filename, table], # Initialize values to avoid NameError in except clause
bufsize=-1, stdin=None, stdout=sp.PIPE, mdb_output = ''
universal_newlines=True) insert_match = None
with sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
mdb_output = mdb_sql.stdout.read() mdb_output = mdb_sql.stdout.read()
while len(mdb_output) > 0: while len(mdb_output) > 0:
insert_match = re.match(r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', insert_match = insert_pattern.match(mdb_output)
mdb_output, re.IGNORECASE)
s3db.execute(insert_match.group()) s3db.execute(insert_match.group())
mdb_output = mdb_output[insert_match.end():] mdb_output = mdb_output[insert_match.end():]
mdb_output += mdb_sql.stdout.read()
s3db.commit() s3db.commit()
except:
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
else:
raise
except BaseException:
print("Error while importing %s" % table) print("Error while importing %s" % table)
if mdb_output:
print("Remaining mdb-export output:", mdb_output) print("Remaining mdb-export output:", mdb_output)
if insert_match: if insert_match:
print("insert_re match:", insert_match) print("insert_re match:", insert_match)
raise raise
finally:
mdb_sql.terminate()
def mdb_get_data_numeric(filename, table): def mdb_get_data_numeric(s3db, filename, table):
print("Reading %s..." % table) print("Reading %s..." % table)
try: try:
mdb_sql = sp.Popen(['mdb-export', filename, table], with sp.Popen(['mdb-export', filename, table],
bufsize=-1, stdin=None, stdout=sp.PIPE, bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) universal_newlines=True) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout) mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv) mdb_headers = next(mdb_csv)
quoted_headers = ['"%s"' % h for h in mdb_headers] quoted_headers = ['"%s"' % h for h in mdb_headers]
joined_headers = ', '.join(quoted_headers) joined_headers = ', '.join(quoted_headers)
joined_placemarks = ', '.join(['?' for h in mdb_headers]) joined_placemarks = ', '.join(['?' for h in mdb_headers])
insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(table, insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(
joined_headers, joined_placemarks) table,
joined_headers,
joined_placemarks,
)
s3db.executemany(insert_stmt, mdb_csv) s3db.executemany(insert_stmt, mdb_csv)
s3db.commit() s3db.commit()
finally: except OSError as e:
mdb_sql.terminate() if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
else:
raise
def mdb_get_data(filename, table): def mdb_get_data(s3db, filename, table):
if table in mdb_tables_text: if table in mdb_tables_text:
mdb_get_data_text(filename, table) mdb_get_data_text(s3db, filename, table)
elif table in mdb_tables_numeric: elif table in mdb_tables_numeric:
mdb_get_data_numeric(filename, table) mdb_get_data_numeric(s3db, filename, table)
else: else:
raise ValueError("'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table) raise ValueError("'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table)
## Main part of the script def convert_arbin_to_sqlite(input_file, output_file):
"""Read data from an Arbin .res data file and write to a sqlite file.
parser = argparse.ArgumentParser(description="Convert Arbin .res files to sqlite3 databases using mdb-export")
parser.add_argument('input_file', type=str) # need file name to pass to sp.Popen
parser.add_argument('output_file', type=str) # need file name to pass to sqlite3.connect
args = parser.parse_args()
s3db = sqlite3.connect(args.output_file)
Any data currently in the sqlite file will be erased!
"""
s3db = sqlite3.connect(output_file)
for table in reversed(mdb_tables + mdb_5_23_tables): for table in reversed(mdb_tables + mdb_5_23_tables):
s3db.execute('DROP TABLE IF EXISTS "%s";' % table) s3db.execute('DROP TABLE IF EXISTS "%s";' % table)
for table in mdb_tables: for table in mdb_tables:
s3db.executescript(mdb_create_scripts[table]) s3db.executescript(mdb_create_scripts[table])
mdb_get_data(args.input_file, table) mdb_get_data(s3db, input_file, table)
if table in mdb_create_indices: if table in mdb_create_indices:
print("Creating indices for %s..." % table) print("Creating indices for %s..." % table)
s3db.executescript(mdb_create_indices[table]) s3db.executescript(mdb_create_indices[table])
if (s3db.execute("SELECT Version_Schema_Field FROM Version_Table;").fetchone()[0] == "Results File 5.23"): csr = s3db.execute("SELECT Version_Schema_Field FROM Version_Table;")
version_text, = csr.fetchone()
if (version_text == "Results File 5.23"):
for table in mdb_5_23_tables: for table in mdb_5_23_tables:
s3db.executescript(mdb_create_scripts[table]) s3db.executescript(mdb_create_scripts[table])
mdb_get_data(args.input_file, table) mdb_get_data(input_file, table)
if table in mdb_create_indices: if table in mdb_create_indices:
s3db.executescript(mdb_create_indices[table]) s3db.executescript(mdb_create_indices[table])
@@ -434,3 +468,18 @@ s3db.executescript(helper_table_script)
print("Vacuuming database...") print("Vacuuming database...")
s3db.executescript("VACUUM; ANALYZE;") s3db.executescript("VACUUM; ANALYZE;")
def main(argv=None):
parser = argparse.ArgumentParser(
description="Convert Arbin .res files to sqlite3 databases using mdb-export",
)
parser.add_argument('input_file', type=str) # need file name to pass to sp.Popen
parser.add_argument('output_file', type=str) # need file name to pass to sqlite3.connect
args = parser.parse_args(argv)
convert_arbin_to_sqlite(args.input_file, args.output_file)
if __name__ == '__main__':
main()

View File

@@ -7,20 +7,21 @@ mkdir -p tests/testdata
cd tests/testdata cd tests/testdata
/usr/bin/wget --continue -i - <<END_FILELIST /usr/bin/wget --continue -i - <<END_FILELIST
http://files.figshare.com/1778905/arbin1.res https://files.figshare.com/1778905/arbin1.res
http://files.figshare.com/1778937/bio_logic2.mpt https://files.figshare.com/1778937/bio_logic2.mpt
http://files.figshare.com/1778938/bio_logic5.mpt https://files.figshare.com/1778938/bio_logic5.mpt
http://files.figshare.com/1778939/bio_logic1.mpr https://files.figshare.com/1778939/bio_logic1.mpr
http://files.figshare.com/1778940/bio_logic6.mpr https://files.figshare.com/1778940/bio_logic6.mpr
http://files.figshare.com/1778941/bio_logic4.mpt https://files.figshare.com/1778941/bio_logic4.mpt
http://files.figshare.com/1778942/bio_logic5.mpr https://files.figshare.com/1778942/bio_logic5.mpr
http://files.figshare.com/1778943/bio_logic2.mpr https://files.figshare.com/1778943/bio_logic2.mpr
http://files.figshare.com/1778944/bio_logic6.mpt https://files.figshare.com/1778944/bio_logic6.mpt
http://files.figshare.com/1778945/bio_logic1.mpt https://files.figshare.com/1778945/bio_logic1.mpt
http://files.figshare.com/1778946/bio_logic3.mpr https://files.figshare.com/1778946/bio_logic3.mpr
http://files.figshare.com/1780444/bio_logic4.mpr https://files.figshare.com/1780444/bio_logic4.mpr
http://files.figshare.com/1780529/121_CA_455nm_6V_30min_C01.mpr https://files.figshare.com/1780529/121_CA_455nm_6V_30min_C01.mpr
http://files.figshare.com/1780530/121_CA_455nm_6V_30min_C01.mpt https://files.figshare.com/1780530/121_CA_455nm_6V_30min_C01.mpt
http://files.figshare.com/1780526/CV_C01.mpr https://files.figshare.com/1780526/CV_C01.mpr
http://files.figshare.com/1780527/CV_C01.mpt https://files.figshare.com/1780527/CV_C01.mpt
https://files.figshare.com/14752538/C019P-0ppb-A_C01.mpr
END_FILELIST END_FILELIST

View File

@@ -1,21 +1,38 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os.path
from setuptools import setup from setuptools import setup
with open(os.path.join(os.path.dirname(__file__), 'README.md')) as f:
readme = f.read()
setup( setup(
name='galvani', name='galvani',
version='0.0.1a1', version='0.1.0',
description='Open and process battery charger log data files', description='Open and process battery charger log data files',
url='https://github.com/chatcannon/galvani', long_description=readme,
long_description_content_type="text/markdown",
url='https://github.com/echemdata/galvani',
author='Chris Kerr', author='Chris Kerr',
author_email='chris.kerr@mykolab.ch',
license='GPLv3+', license='GPLv3+',
classifiers=[ classifiers=[
'Development Status :: 3 - Alpha', 'Development Status :: 4 - Beta',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Intended Audience :: Science/Research', 'Intended Audience :: Science/Research',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Natural Language :: English'], 'Natural Language :: English',
'Programming Language :: Python :: 3 :: Only',
'Topic :: Scientific/Engineering :: Chemistry',
],
packages=['galvani'], packages=['galvani'],
scripts=['scripts/res2sqlite.py'], # TODO make this use entry_points entry_points={
install_requires=['numpy'] 'console_scripts': [
'res2sqlite = galvani.res2sqlite:main',
],
},
python_requires='>=3.5',
install_requires=['numpy'],
tests_require=['pytest'],
) )

11
tests/conftest.py Normal file
View File

@@ -0,0 +1,11 @@
"""Helpers for pytest tests."""
import os
import pytest
@pytest.fixture(scope='session')
def testdata_dir():
"""Path to the testdata directory."""
return os.path.join(os.path.dirname(__file__), 'testdata')

56
tests/test_Arbin.py Normal file
View File

@@ -0,0 +1,56 @@
"""Tests for loading Arbin .res files."""
import os
import sqlite3
import subprocess
import pytest
from galvani import res2sqlite
have_mdbtools = (subprocess.call(['which', 'mdb-export'],
stdout=subprocess.DEVNULL) == 0)
def test_res2sqlite_help():
"""Test running `res2sqlite --help`.
This should work even when mdbtools is not installed.
"""
help_output = subprocess.check_output(['res2sqlite', '--help'])
assert b'Convert Arbin .res files to sqlite3 databases' in help_output
@pytest.mark.skipif(have_mdbtools, reason='This tests the failure when mdbtools is not installed')
def test_convert_Arbin_no_mdbtools(testdata_dir, tmpdir):
"""Checks that the conversion fails with an appropriate error message."""
res_file = os.path.join(testdata_dir, 'arbin1.res')
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db')
with pytest.raises(RuntimeError, match="Could not locate the `mdb-export` executable."):
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools')
@pytest.mark.parametrize('basename', ['arbin1'])
def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename):
"""Convert an Arbin file to SQLite using the functional interface."""
res_file = os.path.join(testdata_dir, basename + '.res')
sqlite_file = os.path.join(str(tmpdir), basename + '.s3db')
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
assert os.path.isfile(sqlite_file)
with sqlite3.connect(sqlite_file) as conn:
csr = conn.execute('SELECT * FROM Channel_Normal_Table;')
csr.fetchone()
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools')
def test_convert_cmdline(testdata_dir, tmpdir):
"""Checks that the conversion fails with an appropriate error message."""
res_file = os.path.join(testdata_dir, 'arbin1.res')
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db')
subprocess.check_call(['res2sqlite', res_file, sqlite_file])
assert os.path.isfile(sqlite_file)
with sqlite3.connect(sqlite_file) as conn:
csr = conn.execute('SELECT * FROM Channel_Normal_Table;')
csr.fetchone()

View File

@@ -2,98 +2,108 @@
import os.path import os.path
import re import re
from datetime import date, datetime from datetime import datetime
import numpy as np import numpy as np
from numpy.testing import assert_array_almost_equal, assert_array_equal from numpy.testing import assert_array_almost_equal, assert_array_equal
from nose.tools import ok_, eq_, raises import pytest
from galvani import MPTfile, MPRfile from galvani import BioLogic, MPTfile, MPRfile
from galvani.BioLogic import MPTfileCSV, str3 # not exported from galvani.BioLogic import MPTfileCSV # not exported
testdata_dir = os.path.join(os.path.dirname(__file__), 'testdata')
def test_open_MPT(): def test_open_MPT(testdata_dir):
mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt')) mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt'))
eq_(comments, []) assert comments == []
eq_(mpt1.dtype.names, ("mode", "ox/red", "error", "control changes", assert mpt1.dtype.names == (
"Ns changes", "counter inc.", "time/s", "mode", "ox/red", "error", "control changes", "Ns changes",
"control/V/mA", "Ewe/V", "dQ/mA.h", "P/W", "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dQ/mA.h", "P/W",
"I/mA", "(Q-Qo)/mA.h", "x")) "I/mA", "(Q-Qo)/mA.h", "x",
)
@raises(ValueError) def test_open_MPT_fails_for_bad_file(testdata_dir):
def test_open_MPT_fails_for_bad_file(): with pytest.raises(ValueError, match='Bad first line'):
mpt1 = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr')) MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr'))
def test_open_MPT_csv(): def test_open_MPT_csv(testdata_dir):
mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt')) mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt'))
eq_(comments, []) assert comments == []
eq_(mpt1.fieldnames, ["mode", "ox/red", "error", "control changes", assert mpt1.fieldnames == [
"Ns changes", "counter inc.", "time/s", "mode", "ox/red", "error", "control changes", "Ns changes",
"control/V/mA", "Ewe/V", "dq/mA.h", "P/W", "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", "P/W",
"<I>/mA", "(Q-Qo)/mA.h", "x"]) "<I>/mA", "(Q-Qo)/mA.h", "x",
]
@raises(ValueError) def test_open_MPT_csv_fails_for_bad_file(testdata_dir):
def test_open_MPT_csv_fails_for_bad_file(): with pytest.raises((ValueError, UnicodeDecodeError)):
mpt1 = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr')) MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr'))
def test_open_MPR1(): def test_colID_map_uniqueness():
mpr1 = MPRfile(os.path.join(testdata_dir, 'bio_logic1.mpr')) """Check some uniqueness properties of the VMPdata_colID_xyz maps."""
## Check the dates as a basic test that it has been read properly field_colIDs = set(BioLogic.VMPdata_colID_dtype_map.keys())
eq_(mpr1.startdate, date(2011, 10, 29)) flag_colIDs = set(BioLogic.VMPdata_colID_flag_map.keys())
eq_(mpr1.enddate, date(2011, 10, 31)) field_names = [v[0] for v in BioLogic.VMPdata_colID_dtype_map.values()]
flag_names = [v[0] for v in BioLogic.VMPdata_colID_flag_map.values()]
assert not field_colIDs.intersection(flag_colIDs)
# 'I/mA' and 'dQ/mA.h' are duplicated
# assert len(set(field_names)) == len(field_names)
assert len(set(flag_names)) == len(flag_names)
assert not set(field_names).intersection(flag_names)
def test_open_MPR2(): @pytest.mark.parametrize('colIDs, expected', [
mpr2 = MPRfile(os.path.join(testdata_dir, 'bio_logic2.mpr')) ([1, 2, 3], [('flags', 'u1')]),
## Check the dates as a basic test that it has been read properly ([4, 6], [('time/s', '<f8'), ('Ewe/V', '<f4')]),
eq_(mpr2.startdate, date(2012, 9, 27)) ([1, 4, 21], [('flags', 'u1'), ('time/s', '<f8')]),
eq_(mpr2.enddate, date(2012, 9, 27)) ([4, 6, 4], [('time/s', '<f8'), ('Ewe/V', '<f4'), ('time/s 2', '<f8')]),
([4, 9999], NotImplementedError),
])
def test_colID_to_dtype(colIDs, expected):
"""Test converting column ID to numpy dtype."""
if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
BioLogic.VMPdata_dtype_from_colIDs(colIDs)
return
expected_dtype = np.dtype(expected)
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
assert dtype == expected_dtype
def test_open_MPR3(): @pytest.mark.parametrize('filename, startdate, enddate', [
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic3.mpr')) ('bio_logic1.mpr', '2011-10-29', '2011-10-31'),
## Check the dates as a basic test that it has been read properly ('bio_logic2.mpr', '2012-09-27', '2012-09-27'),
eq_(mpr.startdate, date(2013, 3, 27)) ('bio_logic3.mpr', '2013-03-27', '2013-03-27'),
eq_(mpr.enddate, date(2013, 3, 27)) ('bio_logic4.mpr', '2011-11-01', '2011-11-02'),
('bio_logic5.mpr', '2013-01-28', '2013-01-28'),
# bio_logic6.mpr has no end date because it does not have a VMP LOG module
('bio_logic6.mpr', '2012-09-11', None),
# C019P-0ppb-A_C01.mpr stores the date in a different format
('C019P-0ppb-A_C01.mpr', '2019-03-14', '2019-03-14'),
])
def test_MPR_dates(testdata_dir, filename, startdate, enddate):
"""Check that the start and end dates in .mpr files are read correctly."""
mpr = MPRfile(os.path.join(testdata_dir, filename))
assert mpr.startdate.strftime('%Y-%m-%d') == startdate
if enddate:
assert mpr.enddate.strftime('%Y-%m-%d') == enddate
else:
assert not hasattr(mpr, 'enddate')
def test_open_MPR4(): def test_open_MPR_fails_for_bad_file(testdata_dir):
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic4.mpr')) with pytest.raises(ValueError, match='Invalid magic for .mpr file'):
## Check the dates as a basic test that it has been read properly MPRfile(os.path.join(testdata_dir, 'arbin1.res'))
eq_(mpr.startdate, date(2011, 11, 1))
eq_(mpr.enddate, date(2011, 11, 2))
def test_open_MPR5():
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr'))
## Check the dates as a basic test that it has been read properly
eq_(mpr.startdate, date(2013, 1, 28))
eq_(mpr.enddate, date(2013, 1, 28))
def test_open_MPR6():
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr'))
## Check the dates as a basic test that it has been read properly
eq_(mpr.startdate, date(2012, 9, 11))
## no end date because no VMP LOG module
@raises(ValueError)
def test_open_MPR_fails_for_bad_file():
mpr1 = MPRfile(os.path.join(testdata_dir, 'arbin1.res'))
def timestamp_from_comments(comments): def timestamp_from_comments(comments):
for line in comments: for line in comments:
time_match = re.match(b'Acquisition started on : ([0-9/]+ [0-9:]+)', line) time_match = re.match(b'Acquisition started on : ([0-9/]+ [0-9:]+)', line)
if time_match: if time_match:
timestamp = datetime.strptime(str3(time_match.group(1)), timestamp = datetime.strptime(time_match.group(1).decode('ascii'),
'%m/%d/%Y %H:%M:%S') '%m/%d/%Y %H:%M:%S')
return timestamp return timestamp
raise AttributeError("No timestamp in comments") raise AttributeError("No timestamp in comments")
@@ -117,7 +127,7 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
assert_array_equal(mpr.get_flag("control changes"), mpt["control changes"]) assert_array_equal(mpr.get_flag("control changes"), mpt["control changes"])
if "Ns changes" in mpt.dtype.fields: if "Ns changes" in mpt.dtype.fields:
assert_array_equal(mpr.get_flag("Ns changes"), mpt["Ns changes"]) assert_array_equal(mpr.get_flag("Ns changes"), mpt["Ns changes"])
## Nothing uses the 0x40 bit of the flags # Nothing uses the 0x40 bit of the flags
assert_array_equal(mpr.get_flag("counter inc."), mpt["counter inc."]) assert_array_equal(mpr.get_flag("counter inc."), mpt["counter inc."])
assert_array_almost_equal(mpr.data["time/s"], assert_array_almost_equal(mpr.data["time/s"],
@@ -139,33 +149,34 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
assert_field_matches("(Q-Qo)/C", decimal=6) # 32 bit float precision assert_field_matches("(Q-Qo)/C", decimal=6) # 32 bit float precision
try: try:
eq_(timestamp_from_comments(comments), mpr.timestamp) assert timestamp_from_comments(comments) == mpr.timestamp
except AttributeError: except AttributeError:
pass pass
def test_MPR1_matches_MPT1(): @pytest.mark.parametrize('basename', [
mpr1 = MPRfile(os.path.join(testdata_dir, 'bio_logic1.mpr')) 'bio_logic1',
mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt')) 'bio_logic2',
assert_MPR_matches_MPT(mpr1, mpt1, comments) # No bio_logic3.mpt file
'bio_logic4',
# bio_logic5 and bio_logic6 are special cases
'CV_C01',
'121_CA_455nm_6V_30min_C01',
])
def test_MPR_matches_MPT(testdata_dir, basename):
"""Check the MPR parser against the MPT parser.
Load a binary .mpr file and a text .mpt file which should contain
exactly the same data. Check that the loaded data actually match.
"""
binpath = os.path.join(testdata_dir, basename + '.mpr')
txtpath = os.path.join(testdata_dir, basename + '.mpt')
mpr = MPRfile(binpath)
mpt, comments = MPTfile(txtpath)
assert_MPR_matches_MPT(mpr, mpt, comments)
def test_MPR2_matches_MPT2(): def test_MPR5_matches_MPT5(testdata_dir):
mpr2 = MPRfile(os.path.join(testdata_dir, 'bio_logic2.mpr'))
mpt2, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic2.mpt'))
assert_MPR_matches_MPT(mpr2, mpt2, comments)
## No bio_logic3.mpt file
def test_MPR4_matches_MPT4():
mpr4 = MPRfile(os.path.join(testdata_dir, 'bio_logic4.mpr'))
mpt4, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic4.mpt'))
assert_MPR_matches_MPT(mpr4, mpt4, comments)
def test_MPR5_matches_MPT5():
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr')) mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr'))
mpt, comments = MPTfile((re.sub(b'\tXXX\t', b'\t0\t', line) for line in mpt, comments = MPTfile((re.sub(b'\tXXX\t', b'\t0\t', line) for line in
open(os.path.join(testdata_dir, 'bio_logic5.mpt'), open(os.path.join(testdata_dir, 'bio_logic5.mpt'),
@@ -173,23 +184,8 @@ def test_MPR5_matches_MPT5():
assert_MPR_matches_MPT(mpr, mpt, comments) assert_MPR_matches_MPT(mpr, mpt, comments)
def test_MPR6_matches_MPT6(): def test_MPR6_matches_MPT6(testdata_dir):
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr')) mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr'))
mpt, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic6.mpt')) mpt, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic6.mpt'))
mpr.data = mpr.data[:958] # .mpt file is incomplete mpr.data = mpr.data[:958] # .mpt file is incomplete
assert_MPR_matches_MPT(mpr, mpt, comments) assert_MPR_matches_MPT(mpr, mpt, comments)
## Tests for issue #1 -- new dtypes ##
def test_CV_C01():
mpr = MPRfile(os.path.join(testdata_dir, 'CV_C01.mpr'))
mpt, comments = MPTfile(os.path.join(testdata_dir, 'CV_C01.mpt'))
assert_MPR_matches_MPT(mpr, mpt, comments)
def test_CA_455nm():
mpr = MPRfile(os.path.join(testdata_dir, '121_CA_455nm_6V_30min_C01.mpr'))
mpt, comments = MPTfile(os.path.join(testdata_dir, '121_CA_455nm_6V_30min_C01.mpt'))
assert_MPR_matches_MPT(mpr, mpt, comments)

14
tox.ini
View File

@@ -1,5 +1,13 @@
[tox] [tox]
envlist = py27,py35 envlist = py35,py36,py37,py38
[testenv] [testenv]
deps=nose deps =
commands=nosetests flake8
pytest
commands =
flake8
pytest
[flake8]
exclude = build,dist,*.egg-info,.cache,.git,.tox,__pycache__
max-line-length = 100