Merge branch 'master' into flake8

This commit is contained in:
2019-05-05 08:12:52 +02:00
2 changed files with 126 additions and 112 deletions

View File

@@ -9,7 +9,7 @@ import csv
from os import SEEK_SET from os import SEEK_SET
import time import time
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from collections import OrderedDict from collections import defaultdict, OrderedDict
import numpy as np import numpy as np
@@ -145,119 +145,105 @@ VMPmodule_hdr = np.dtype([('shortname', 'S10'),
('version', '<u4'), ('version', '<u4'),
('date', 'S8')]) ('date', 'S8')])
# Maps from colID to a tuple defining a numpy dtype
VMPdata_colID_dtype_map = {
4: ('time/s', '<f8'),
5: ('control/V/mA', '<f4'),
6: ('Ewe/V', '<f4'),
7: ('dQ/mA.h', '<f8'),
8: ('I/mA', '<f4'), # 8 is either I or <I> ??
9: ('Ece/V', '<f4'),
11: ('I/mA', '<f8'),
13: ('(Q-Qo)/mA.h', '<f8'),
19: ('control/V', '<f4'),
20: ('control/mA', '<f4'),
23: ('dQ/mA.h', '<f8'), # Same as 7?
24: ('cycle number', '<f8'),
32: ('freq/Hz', '<f4'),
33: ('|Ewe|/V', '<f4'),
34: ('|I|/A', '<f4'),
35: ('Phase(Z)/deg', '<f4'),
36: ('|Z|/Ohm', '<f4'),
37: ('Re(Z)/Ohm', '<f4'),
38: ('-Im(Z)/Ohm', '<f4'),
39: ('I Range', '<u2'),
70: ('P/W', '<f4'),
76: ('<I>/mA', '<f4'),
77: ('<Ewe>/V', '<f4'),
123: ('Energy charge/W.h', '<f8'),
124: ('Energy discharge/W.h', '<f8'),
125: ('Capacitance charge/µF', '<f8'),
126: ('Capacitance discharge/µF', '<f8'),
131: ('Ns', '<u2'),
169: ('Cs/µF', '<f4'),
172: ('Cp/µF', '<f4'),
434: ('(Q-Qo)/C', '<f4'),
435: ('dQ/C', '<f4'),
467: ('Q charge/discharge/mA.h', '<f8'),
468: ('half cycle', '<u4'),
473: ('THD Ewe/%', '<f4'),
474: ('THD I/%', '<f4'),
476: ('NSD Ewe/%', '<f4'),
477: ('NSD I/%', '<f4'),
479: ('NSR Ewe/%', '<f4'),
480: ('NSR I/%', '<f4'),
}
# These column IDs define flags which are all stored packed in a single byte
# The values in the map are (name, bitmask, dtype)
VMPdata_colID_flag_map = {
1: ('mode', 0x03, np.uint8),
2: ('ox/red', 0x04, np.bool_),
3: ('error', 0x08, np.bool_),
21: ('control changes', 0x10, np.bool_),
31: ('Ns changes', 0x20, np.bool_),
65: ('counter inc.', 0x80, np.bool_),
}
def VMPdata_dtype_from_colIDs(colIDs): def VMPdata_dtype_from_colIDs(colIDs):
dtype_dict = OrderedDict() """Get a numpy record type from a list of column ID numbers.
The binary layout of the data in the MPR file is described by the sequence
of column ID numbers in the file header. This function converts that
sequence into a numpy dtype which can then be used to load data from the
file with np.frombuffer().
Some column IDs refer to small values which are packed into a single byte.
The second return value is a dict describing the bit masks with which to
extract these columns from the flags byte.
"""
type_list = []
field_name_counts = defaultdict(int)
flags_dict = OrderedDict() flags_dict = OrderedDict()
flags2_dict = OrderedDict()
for colID in colIDs: for colID in colIDs:
if colID in (1, 2, 3, 21, 31, 65): if colID in VMPdata_colID_flag_map:
dtype_dict['flags'] = 'u1' # Some column IDs represent boolean flags or small integers
if colID == 1: # These are all packed into a single 'flags' byte whose position
flags_dict['mode'] = (np.uint8(0x03), np.uint8) # in the overall record is determined by the position of the first
elif colID == 2: # column ID of flag type. If there are several flags present,
flags_dict['ox/red'] = (np.uint8(0x04), np.bool_) # there is still only one 'flags' int
elif colID == 3: if 'flags' not in field_name_counts:
flags_dict['error'] = (np.uint8(0x08), np.bool_) type_list.append(('flags', 'u1'))
elif colID == 21: field_name_counts['flags'] = 1
flags_dict['control changes'] = (np.uint8(0x10), np.bool_) flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID]
elif colID == 31: # TODO what happens if a flag colID has already been seen
flags_dict['Ns changes'] = (np.uint8(0x20), np.bool_) # i.e. if flag_name is already present in flags_dict?
elif colID == 65: # Does it create a second 'flags' byte in the record?
flags_dict['counter inc.'] = (np.uint8(0x80), np.bool_) flags_dict[flag_name] = (np.uint8(flag_mask), flag_type)
elif colID in VMPdata_colID_dtype_map:
field_name, field_type = VMPdata_colID_dtype_map[colID]
field_name_counts[field_name] += 1
count = field_name_counts[field_name]
if count > 1:
unique_field_name = '%s %d' % (field_name, count)
else: else:
raise NotImplementedError("flag %d not implemented" % colID) unique_field_name = field_name
elif colID == 4: type_list.append((unique_field_name, field_type))
dtype_dict['time/s'] = '<f8'
elif colID == 5:
dtype_dict['control/V/mA'] = '<f4'
# 6 is Ewe, 77 is <Ewe>, I don't see the difference
elif colID in (6, 77):
dtype_dict['Ewe/V'] = '<f4'
# Can't see any difference between 7 and 23
elif colID in (7, 23):
dtype_dict['dQ/mA.h'] = '<f8'
# 76 is <I>, 8 is either I or <I> ??
elif colID in (8, 76):
dtype_dict['I/mA'] = '<f4'
elif colID == 9:
dtype_dict['Ece/V'] = '<f4'
elif colID == 11:
dtype_dict['I/mA'] = '<f8'
elif colID == 13:
dtype_dict['(Q-Qo)/mA.h'] = '<f8'
elif colID == 16:
dtype_dict['Analog IN 1/V'] = '<f4'
elif colID == 19:
dtype_dict['control/V'] = '<f4'
elif colID == 20:
dtype_dict['control/mA'] = '<f4'
elif colID == 24:
dtype_dict['cycle number'] = '<f8'
elif colID == 32:
dtype_dict['freq/Hz'] = '<f4'
elif colID == 33:
dtype_dict['|Ewe|/V'] = '<f4'
elif colID == 34:
dtype_dict['|I|/A'] = '<f4'
elif colID == 35:
dtype_dict['Phase(Z)/deg'] = '<f4'
elif colID == 36:
dtype_dict['|Z|/Ohm'] = '<f4'
elif colID == 37:
dtype_dict['Re(Z)/Ohm'] = '<f4'
elif colID == 38:
dtype_dict['-Im(Z)/Ohm'] = '<f4'
elif colID == 39:
dtype_dict['I Range'] = '<u2'
elif colID == 70:
dtype_dict['P/W'] = '<f4'
elif colID == 74:
dtype_dict['Energy/W.h'] = '<f8'
elif colID == 78:
dtype_dict['Cs-2/µf-2'] = '<f4'
elif colID == 123:
dtype_dict['Energy charge/W.h'] = '<f8'
elif colID == 124:
dtype_dict['Energy discharge/W.h'] = '<f8'
elif colID == 125:
dtype_dict['Capacitance charge/µF'] = '<f8'
elif colID == 126:
dtype_dict['Capacitance discharge/µF'] = '<f8'
elif colID == 131:
dtype_dict['Ns'] = '<u2'
elif colID == 169:
dtype_dict['Cs/µF'] = '<f4'
elif colID == 172:
dtype_dict['Cp/µF'] = '<f4'
elif colID == 173:
dtype_dict['Cp-2/µF-2'] = '<f4'
elif colID == 434:
dtype_dict['(Q-Qo)/C'] = '<f4'
elif colID == 435:
dtype_dict['dQ/C'] = '<f4'
elif colID == 467:
dtype_dict['Q charge/discharge/mA.h'] = '<f8'
elif colID == 468:
dtype_dict['half cycle'] = '<u4'
elif colID == 469:
dtype_dict['z cycle'] = '<u4'
elif colID == 473:
dtype_dict['THD Ewe/%'] = '<f4'
elif colID == 476:
dtype_dict['NSD Ewe/%'] = '<f4'
elif colID == 479:
dtype_dict['NSR Ewe/%'] = '<f4'
elif colID == 474:
dtype_dict['THD I/%'] = '<f4'
elif colID == 477:
dtype_dict['NSD I/%'] = '<f4'
elif colID == 480:
dtype_dict['NSR I/%'] = '<f4'
else: else:
print(dtype_dict)
raise NotImplementedError("column type %d not implemented" % colID) raise NotImplementedError("column type %d not implemented" % colID)
return np.dtype(list(dtype_dict.items())), flags_dict, flags2_dict return np.dtype(type_list), flags_dict
def read_VMP_modules(fileobj, read_module_data=True): def read_VMP_modules(fileobj, read_module_data=True):
@@ -355,7 +341,7 @@ class MPRfile:
else: else:
assert(not any(remaining_headers)) assert(not any(remaining_headers))
self.dtype, self.flags_dict, self.flags2_dict = VMPdata_dtype_from_colIDs(column_types) self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
self.data = np.frombuffer(main_data, dtype=self.dtype) self.data = np.frombuffer(main_data, dtype=self.dtype)
assert(self.data.shape[0] == n_data_points) assert(self.data.shape[0] == n_data_points)
@@ -426,8 +412,5 @@ class MPRfile:
if flagname in self.flags_dict: if flagname in self.flags_dict:
mask, dtype = self.flags_dict[flagname] mask, dtype = self.flags_dict[flagname]
return np.array(self.data['flags'] & mask, dtype=dtype) return np.array(self.data['flags'] & mask, dtype=dtype)
elif flagname in self.flags2_dict:
mask, dtype = self.flags2_dict[flagname]
return np.array(self.data['flags2'] & mask, dtype=dtype)
else: else:
raise AttributeError("Flag '%s' not present" % flagname) raise AttributeError("Flag '%s' not present" % flagname)

View File

@@ -8,7 +8,7 @@ import numpy as np
from numpy.testing import assert_array_almost_equal, assert_array_equal from numpy.testing import assert_array_almost_equal, assert_array_equal
import pytest import pytest
from galvani import MPTfile, MPRfile from galvani import BioLogic, MPTfile, MPRfile
from galvani.BioLogic import MPTfileCSV, str3 # not exported from galvani.BioLogic import MPTfileCSV, str3 # not exported
testdata_dir = os.path.join(os.path.dirname(__file__), 'testdata') testdata_dir = os.path.join(os.path.dirname(__file__), 'testdata')
@@ -44,6 +44,37 @@ def test_open_MPT_csv_fails_for_bad_file():
MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr')) MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr'))
def test_colID_map_uniqueness():
"""Check some uniqueness properties of the VMPdata_colID_xyz maps."""
field_colIDs = set(BioLogic.VMPdata_colID_dtype_map.keys())
flag_colIDs = set(BioLogic.VMPdata_colID_flag_map.keys())
field_names = [v[0] for v in BioLogic.VMPdata_colID_dtype_map.values()]
flag_names = [v[0] for v in BioLogic.VMPdata_colID_flag_map.values()]
assert not field_colIDs.intersection(flag_colIDs)
# 'I/mA' and 'dQ/mA.h' are duplicated
# assert len(set(field_names)) == len(field_names)
assert len(set(flag_names)) == len(flag_names)
assert not set(field_names).intersection(flag_names)
@pytest.mark.parametrize('colIDs, expected', [
([1, 2, 3], [('flags', 'u1')]),
([4, 6], [('time/s', '<f8'), ('Ewe/V', '<f4')]),
([1, 4, 21], [('flags', 'u1'), ('time/s', '<f8')]),
([4, 6, 4], [('time/s', '<f8'), ('Ewe/V', '<f4'), ('time/s 2', '<f8')]),
([4, 9999], NotImplementedError),
])
def test_colID_to_dtype(colIDs, expected):
"""Test converting column ID to numpy dtype."""
if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
BioLogic.VMPdata_dtype_from_colIDs(colIDs)
return
expected_dtype = np.dtype(expected)
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
assert dtype == expected_dtype
@pytest.mark.parametrize('filename, startdate, enddate', [ @pytest.mark.parametrize('filename, startdate, enddate', [
('bio_logic1.mpr', '2011-10-29', '2011-10-31'), ('bio_logic1.mpr', '2011-10-29', '2011-10-31'),
('bio_logic2.mpr', '2012-09-27', '2012-09-27'), ('bio_logic2.mpr', '2012-09-27', '2012-09-27'),