Merge branch 'master' into test-and-fix-Arbin

This commit is contained in:
2019-05-05 08:14:43 +02:00
2 changed files with 138 additions and 112 deletions

View File

@@ -9,7 +9,7 @@ import csv
from os import SEEK_SET
import time
from datetime import date, datetime, timedelta
from collections import OrderedDict
from collections import defaultdict, OrderedDict
import numpy as np
@@ -144,119 +144,105 @@ VMPmodule_hdr = np.dtype([('shortname', 'S10'),
('version', '<u4'),
('date', 'S8')])
# Maps from colID to a tuple defining a numpy dtype
VMPdata_colID_dtype_map = {
4: ('time/s', '<f8'),
5: ('control/V/mA', '<f4'),
6: ('Ewe/V', '<f4'),
7: ('dQ/mA.h', '<f8'),
8: ('I/mA', '<f4'), # 8 is either I or <I> ??
9: ('Ece/V', '<f4'),
11: ('I/mA', '<f8'),
13: ('(Q-Qo)/mA.h', '<f8'),
19: ('control/V', '<f4'),
20: ('control/mA', '<f4'),
23: ('dQ/mA.h', '<f8'), # Same as 7?
24: ('cycle number', '<f8'),
32: ('freq/Hz', '<f4'),
33: ('|Ewe|/V', '<f4'),
34: ('|I|/A', '<f4'),
35: ('Phase(Z)/deg', '<f4'),
36: ('|Z|/Ohm', '<f4'),
37: ('Re(Z)/Ohm', '<f4'),
38: ('-Im(Z)/Ohm', '<f4'),
39: ('I Range', '<u2'),
70: ('P/W', '<f4'),
76: ('<I>/mA', '<f4'),
77: ('<Ewe>/V', '<f4'),
123: ('Energy charge/W.h', '<f8'),
124: ('Energy discharge/W.h', '<f8'),
125: ('Capacitance charge/µF', '<f8'),
126: ('Capacitance discharge/µF', '<f8'),
131: ('Ns', '<u2'),
169: ('Cs/µF', '<f4'),
172: ('Cp/µF', '<f4'),
434: ('(Q-Qo)/C', '<f4'),
435: ('dQ/C', '<f4'),
467: ('Q charge/discharge/mA.h', '<f8'),
468: ('half cycle', '<u4'),
473: ('THD Ewe/%', '<f4'),
474: ('THD I/%', '<f4'),
476: ('NSD Ewe/%', '<f4'),
477: ('NSD I/%', '<f4'),
479: ('NSR Ewe/%', '<f4'),
480: ('NSR I/%', '<f4'),
}
# These column IDs define flags which are all stored packed in a single byte
# The values in the map are (name, bitmask, dtype)
VMPdata_colID_flag_map = {
1: ('mode', 0x03, np.uint8),
2: ('ox/red', 0x04, np.bool_),
3: ('error', 0x08, np.bool_),
21: ('control changes', 0x10, np.bool_),
31: ('Ns changes', 0x20, np.bool_),
65: ('counter inc.', 0x80, np.bool_),
}
def VMPdata_dtype_from_colIDs(colIDs):
dtype_dict = OrderedDict()
"""Get a numpy record type from a list of column ID numbers.
The binary layout of the data in the MPR file is described by the sequence
of column ID numbers in the file header. This function converts that
sequence into a numpy dtype which can then be used to load data from the
file with np.frombuffer().
Some column IDs refer to small values which are packed into a single byte.
The second return value is a dict describing the bit masks with which to
extract these columns from the flags byte.
"""
type_list = []
field_name_counts = defaultdict(int)
flags_dict = OrderedDict()
flags2_dict = OrderedDict()
for colID in colIDs:
if colID in (1, 2, 3, 21, 31, 65):
dtype_dict['flags'] = 'u1'
if colID == 1:
flags_dict['mode'] = (np.uint8(0x03), np.uint8)
elif colID == 2:
flags_dict['ox/red'] = (np.uint8(0x04), np.bool_)
elif colID == 3:
flags_dict['error'] = (np.uint8(0x08), np.bool_)
elif colID == 21:
flags_dict['control changes'] = (np.uint8(0x10), np.bool_)
elif colID == 31:
flags_dict['Ns changes'] = (np.uint8(0x20), np.bool_)
elif colID == 65:
flags_dict['counter inc.'] = (np.uint8(0x80), np.bool_)
if colID in VMPdata_colID_flag_map:
# Some column IDs represent boolean flags or small integers
# These are all packed into a single 'flags' byte whose position
# in the overall record is determined by the position of the first
# column ID of flag type. If there are several flags present,
# there is still only one 'flags' int
if 'flags' not in field_name_counts:
type_list.append(('flags', 'u1'))
field_name_counts['flags'] = 1
flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID]
# TODO what happens if a flag colID has already been seen
# i.e. if flag_name is already present in flags_dict?
# Does it create a second 'flags' byte in the record?
flags_dict[flag_name] = (np.uint8(flag_mask), flag_type)
elif colID in VMPdata_colID_dtype_map:
field_name, field_type = VMPdata_colID_dtype_map[colID]
field_name_counts[field_name] += 1
count = field_name_counts[field_name]
if count > 1:
unique_field_name = '%s %d' % (field_name, count)
else:
raise NotImplementedError("flag %d not implemented" % colID)
elif colID == 4:
dtype_dict['time/s'] = '<f8'
elif colID == 5:
dtype_dict['control/V/mA'] = '<f4'
# 6 is Ewe, 77 is <Ewe>, I don't see the difference
elif colID in (6, 77):
dtype_dict['Ewe/V'] = '<f4'
# Can't see any difference between 7 and 23
elif colID in (7, 23):
dtype_dict['dQ/mA.h'] = '<f8'
# 76 is <I>, 8 is either I or <I> ??
elif colID in (8, 76):
dtype_dict['I/mA'] = '<f4'
elif colID == 9:
dtype_dict['Ece/V'] = '<f4'
elif colID == 11:
dtype_dict['I/mA'] = '<f8'
elif colID == 13:
dtype_dict['(Q-Qo)/mA.h'] = '<f8'
elif colID == 16:
dtype_dict['Analog IN 1/V'] = '<f4'
elif colID == 19:
dtype_dict['control/V'] = '<f4'
elif colID == 20:
dtype_dict['control/mA'] = '<f4'
elif colID == 24:
dtype_dict['cycle number'] = '<f8'
elif colID == 32:
dtype_dict['freq/Hz'] = '<f4'
elif colID == 33:
dtype_dict['|Ewe|/V'] = '<f4'
elif colID == 34:
dtype_dict['|I|/A'] = '<f4'
elif colID == 35:
dtype_dict['Phase(Z)/deg'] = '<f4'
elif colID == 36:
dtype_dict['|Z|/Ohm'] = '<f4'
elif colID == 37:
dtype_dict['Re(Z)/Ohm'] = '<f4'
elif colID == 38:
dtype_dict['-Im(Z)/Ohm'] = '<f4'
elif colID == 39:
dtype_dict['I Range'] = '<u2'
elif colID == 70:
dtype_dict['P/W'] = '<f4'
elif colID == 74:
dtype_dict['Energy/W.h'] = '<f8'
elif colID == 78:
dtype_dict['Cs-2/µf-2'] = '<f4'
elif colID == 123:
dtype_dict['Energy charge/W.h'] = '<f8'
elif colID == 124:
dtype_dict['Energy discharge/W.h'] = '<f8'
elif colID == 125:
dtype_dict['Capacitance charge/µF'] = '<f8'
elif colID == 126:
dtype_dict['Capacitance discharge/µF'] = '<f8'
elif colID == 131:
dtype_dict['Ns'] = '<u2'
elif colID == 169:
dtype_dict['Cs/µF'] = '<f4'
elif colID == 172:
dtype_dict['Cp/µF'] = '<f4'
elif colID == 173:
dtype_dict['Cp-2/µF-2'] = '<f4'
elif colID == 434:
dtype_dict['(Q-Qo)/C'] = '<f4'
elif colID == 435:
dtype_dict['dQ/C'] = '<f4'
elif colID == 467:
dtype_dict['Q charge/discharge/mA.h'] = '<f8'
elif colID == 468:
dtype_dict['half cycle'] = '<u4'
elif colID == 469:
dtype_dict['z cycle'] = '<u4'
elif colID == 473:
dtype_dict['THD Ewe/%'] = '<f4'
elif colID == 476:
dtype_dict['NSD Ewe/%'] = '<f4'
elif colID == 479:
dtype_dict['NSR Ewe/%'] = '<f4'
elif colID == 474:
dtype_dict['THD I/%'] = '<f4'
elif colID == 477:
dtype_dict['NSD I/%'] = '<f4'
elif colID == 480:
dtype_dict['NSR I/%'] = '<f4'
unique_field_name = field_name
type_list.append((unique_field_name, field_type))
else:
print(dtype_dict)
raise NotImplementedError("column type %d not implemented" % colID)
return np.dtype(list(dtype_dict.items())), flags_dict, flags2_dict
return np.dtype(type_list), flags_dict
def read_VMP_modules(fileobj, read_module_data=True):
@@ -311,6 +297,7 @@ class MPRfile:
"""
def __init__(self, file_or_path):
self.loop_index = None
if isinstance(file_or_path, str):
mpr_file = open(file_or_path, 'rb')
else:
@@ -325,6 +312,7 @@ class MPRfile:
self.modules = modules
settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ')
data_module, = (m for m in modules if m['shortname'] == b'VMP data ')
maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop ']
maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG ']
n_data_points = np.frombuffer(data_module['data'][:4], dtype='<u4')
@@ -350,7 +338,7 @@ class MPRfile:
else:
assert(not any(remaining_headers))
self.dtype, self.flags_dict, self.flags2_dict = VMPdata_dtype_from_colIDs(column_types)
self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
self.data = np.frombuffer(main_data, dtype=self.dtype)
assert(self.data.shape[0] == n_data_points)
@@ -366,6 +354,16 @@ class MPRfile:
tm = time.strptime(str3(settings_mod['date']), '%m-%d-%y')
self.startdate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
if maybe_loop_module:
loop_module, = maybe_loop_module
if loop_module['version'] == 0:
self.loop_index = np.fromstring(loop_module['data'][4:],
dtype='<u4')
self.loop_index = np.trim_zeros(self.loop_index, 'b')
else:
raise ValueError("Unrecognised version for data module: %d" %
data_module['version'])
if maybe_log_module:
log_module, = maybe_log_module
try:
@@ -411,8 +409,5 @@ class MPRfile:
if flagname in self.flags_dict:
mask, dtype = self.flags_dict[flagname]
return np.array(self.data['flags'] & mask, dtype=dtype)
elif flagname in self.flags2_dict:
mask, dtype = self.flags2_dict[flagname]
return np.array(self.data['flags2'] & mask, dtype=dtype)
else:
raise AttributeError("Flag '%s' not present" % flagname)

View File

@@ -8,7 +8,7 @@ import numpy as np
from numpy.testing import assert_array_almost_equal, assert_array_equal
import pytest
from galvani import MPTfile, MPRfile
from galvani import BioLogic, MPTfile, MPRfile
from galvani.BioLogic import MPTfileCSV, str3 # not exported
@@ -42,6 +42,37 @@ def test_open_MPT_csv_fails_for_bad_file(testdata_dir):
MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr'))
def test_colID_map_uniqueness():
"""Check some uniqueness properties of the VMPdata_colID_xyz maps."""
field_colIDs = set(BioLogic.VMPdata_colID_dtype_map.keys())
flag_colIDs = set(BioLogic.VMPdata_colID_flag_map.keys())
field_names = [v[0] for v in BioLogic.VMPdata_colID_dtype_map.values()]
flag_names = [v[0] for v in BioLogic.VMPdata_colID_flag_map.values()]
assert not field_colIDs.intersection(flag_colIDs)
# 'I/mA' and 'dQ/mA.h' are duplicated
# assert len(set(field_names)) == len(field_names)
assert len(set(flag_names)) == len(flag_names)
assert not set(field_names).intersection(flag_names)
@pytest.mark.parametrize('colIDs, expected', [
([1, 2, 3], [('flags', 'u1')]),
([4, 6], [('time/s', '<f8'), ('Ewe/V', '<f4')]),
([1, 4, 21], [('flags', 'u1'), ('time/s', '<f8')]),
([4, 6, 4], [('time/s', '<f8'), ('Ewe/V', '<f4'), ('time/s 2', '<f8')]),
([4, 9999], NotImplementedError),
])
def test_colID_to_dtype(colIDs, expected):
"""Test converting column ID to numpy dtype."""
if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
BioLogic.VMPdata_dtype_from_colIDs(colIDs)
return
expected_dtype = np.dtype(expected)
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
assert dtype == expected_dtype
@pytest.mark.parametrize('filename, startdate, enddate', [
('bio_logic1.mpr', '2011-10-29', '2011-10-31'),
('bio_logic2.mpr', '2012-09-27', '2012-09-27'),