Reformatted BioLogic.py with black 23.12.1

This commit is contained in:
2024-01-20 21:45:28 +02:00
parent 31416533d8
commit dee8af3a86

View File

@@ -5,7 +5,7 @@
# #
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
__all__ = ['MPTfileCSV', 'MPTfile'] __all__ = ["MPTfileCSV", "MPTfile"]
import re import re
import csv import csv
@@ -21,19 +21,44 @@ def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of """Converts a column header from the MPT file into a tuple of
canonical name and appropriate numpy dtype""" canonical name and appropriate numpy dtype"""
if fieldname == 'mode': if fieldname == "mode":
return ('mode', np.uint8) return ("mode", np.uint8)
elif fieldname in ("ox/red", "error", "control changes", "Ns changes", elif fieldname in (
"counter inc."): "ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
):
return (fieldname, np.bool_) return (fieldname, np.bool_)
elif fieldname in ("time/s", "P/W", "(Q-Qo)/mA.h", "x", "control/V", elif fieldname in (
"control/mA", "control/V/mA", "(Q-Qo)/C", "dQ/C", "time/s",
"freq/Hz", "|Ewe|/V", "|I|/A", "Phase(Z)/deg", "P/W",
"|Z|/Ohm", "Re(Z)/Ohm", "-Im(Z)/Ohm"): "(Q-Qo)/mA.h",
"x",
"control/V",
"control/mA",
"control/V/mA",
"(Q-Qo)/C",
"dQ/C",
"freq/Hz",
"|Ewe|/V",
"|I|/A",
"Phase(Z)/deg",
"|Z|/Ohm",
"Re(Z)/Ohm",
"-Im(Z)/Ohm",
):
return (fieldname, np.float_) return (fieldname, np.float_)
elif fieldname in ("Q charge/discharge/mA.h", "step time/s", elif fieldname in (
"Q charge/mA.h", "Q discharge/mA.h", "Q charge/discharge/mA.h",
"Temperature/°C", "Efficiency/%", "Capacity/mA.h"): "step time/s",
"Q charge/mA.h",
"Q discharge/mA.h",
"Temperature/°C",
"Efficiency/%",
"Capacity/mA.h",
):
return (fieldname, np.float_) return (fieldname, np.float_)
elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"): elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"):
return (fieldname, np.int_) return (fieldname, np.int_)
@@ -43,12 +68,28 @@ def fieldname_to_dtype(fieldname):
return ("I/mA", np.float_) return ("I/mA", np.float_)
elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V"): elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V"):
return ("Ewe/V", np.float_) return ("Ewe/V", np.float_)
elif fieldname.endswith(("/s", "/Hz", "/deg", elif fieldname.endswith(
"/W", "/mW", "/W.h", "/mW.h", (
"/A", "/mA", "/A.h", "/mA.h", "/s",
"/V", "/mV", "/Hz",
"/F", "/mF", "/uF", "/deg",
"/C", "/Ohm",)): "/W",
"/mW",
"/W.h",
"/mW.h",
"/A",
"/mA",
"/A.h",
"/mA.h",
"/V",
"/mV",
"/F",
"/mF",
"/uF",
"/C",
"/Ohm",
)
):
return (fieldname, np.float_) return (fieldname, np.float_)
else: else:
raise ValueError("Invalid column header: %s" % fieldname) raise ValueError("Invalid column header: %s" % fieldname)
@@ -56,11 +97,11 @@ def fieldname_to_dtype(fieldname):
def comma_converter(float_text): def comma_converter(float_text):
"""Convert text to float whether the decimal point is '.' or ','""" """Convert text to float whether the decimal point is '.' or ','"""
trans_table = bytes.maketrans(b',', b'.') trans_table = bytes.maketrans(b",", b".")
return float(float_text.translate(trans_table)) return float(float_text.translate(trans_table))
def MPTfile(file_or_path, encoding='ascii'): def MPTfile(file_or_path, encoding="ascii"):
"""Opens .mpt files as numpy record arrays """Opens .mpt files as numpy record arrays
Checks for the correct headings, skips any comments and returns a Checks for the correct headings, skips any comments and returns a
@@ -68,16 +109,15 @@ def MPTfile(file_or_path, encoding='ascii'):
""" """
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpt_file = open(file_or_path, 'rb') mpt_file = open(file_or_path, "rb")
else: else:
mpt_file = file_or_path mpt_file = file_or_path
magic = next(mpt_file) magic = next(mpt_file)
if magic not in (b'EC-Lab ASCII FILE\r\n', b'BT-Lab ASCII FILE\r\n'): if magic not in (b"EC-Lab ASCII FILE\r\n", b"BT-Lab ASCII FILE\r\n"):
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$', nb_headers_match = re.match(rb"Nb header lines : (\d+)\s*$", next(mpt_file))
next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
@@ -86,14 +126,12 @@ def MPTfile(file_or_path, encoding='ascii'):
# make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
fieldnames = next(mpt_file).decode(encoding).strip().split('\t') fieldnames = next(mpt_file).decode(encoding).strip().split("\t")
record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames))) record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames)))
# Must be able to parse files where commas are used for decimal points # Must be able to parse files where commas are used for decimal points
converter_dict = dict(((i, comma_converter) converter_dict = dict(((i, comma_converter) for i in range(len(fieldnames))))
for i in range(len(fieldnames)))) mpt_array = np.loadtxt(mpt_file, dtype=record_type, converters=converter_dict)
mpt_array = np.loadtxt(mpt_file, dtype=record_type,
converters=converter_dict)
return mpt_array, comments return mpt_array, comments
@@ -106,15 +144,15 @@ def MPTfileCSV(file_or_path):
""" """
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpt_file = open(file_or_path, 'r') mpt_file = open(file_or_path, "r")
else: else:
mpt_file = file_or_path mpt_file = file_or_path
magic = next(mpt_file) magic = next(mpt_file)
if magic.rstrip() != 'EC-Lab ASCII FILE': if magic.rstrip() != "EC-Lab ASCII FILE":
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match(r'Nb header lines : (\d+)\s*$', next(mpt_file)) nb_headers_match = re.match(r"Nb header lines : (\d+)\s*$", next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
@@ -123,154 +161,206 @@ def MPTfileCSV(file_or_path):
# make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab') mpt_csv = csv.DictReader(mpt_file, dialect="excel-tab")
expected_fieldnames = ( expected_fieldnames = (
["mode", "ox/red", "error", "control changes", "Ns changes", [
"counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", "mode",
"P/W", "<I>/mA", "(Q-Qo)/mA.h", "x"], "ox/red",
['mode', 'ox/red', 'error', 'control changes', 'Ns changes', "error",
'counter inc.', 'time/s', 'control/V', 'Ewe/V', 'dq/mA.h', "control changes",
'<I>/mA', '(Q-Qo)/mA.h', 'x'], "Ns changes",
["mode", "ox/red", "error", "control changes", "Ns changes", "counter inc.",
"counter inc.", "time/s", "control/V", "Ewe/V", "I/mA", "time/s",
"dQ/mA.h", "P/W"], "control/V/mA",
["mode", "ox/red", "error", "control changes", "Ns changes", "Ewe/V",
"counter inc.", "time/s", "control/V", "Ewe/V", "<I>/mA", "dq/mA.h",
"dQ/mA.h", "P/W"]) "P/W",
"<I>/mA",
"(Q-Qo)/mA.h",
"x",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"dq/mA.h",
"<I>/mA",
"(Q-Qo)/mA.h",
"x",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"I/mA",
"dQ/mA.h",
"P/W",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"<I>/mA",
"dQ/mA.h",
"P/W",
],
)
if mpt_csv.fieldnames not in expected_fieldnames: if mpt_csv.fieldnames not in expected_fieldnames:
raise ValueError("Unrecognised headers for MPT file format") raise ValueError("Unrecognised headers for MPT file format")
return mpt_csv, comments return mpt_csv, comments
VMPmodule_hdr = np.dtype([('shortname', 'S10'), VMPmodule_hdr = np.dtype(
('longname', 'S25'), [
('length', '<u4'), ("shortname", "S10"),
('version', '<u4'), ("longname", "S25"),
('date', 'S8')]) ("length", "<u4"),
("version", "<u4"),
("date", "S8"),
]
)
# Maps from colID to a tuple defining a numpy dtype # Maps from colID to a tuple defining a numpy dtype
VMPdata_colID_dtype_map = { VMPdata_colID_dtype_map = {
4: ('time/s', '<f8'), 4: ("time/s", "<f8"),
5: ('control/V/mA', '<f4'), 5: ("control/V/mA", "<f4"),
6: ('Ewe/V', '<f4'), 6: ("Ewe/V", "<f4"),
7: ('dQ/mA.h', '<f8'), 7: ("dQ/mA.h", "<f8"),
8: ('I/mA', '<f4'), # 8 is either I or <I> ?? 8: ("I/mA", "<f4"), # 8 is either I or <I> ??
9: ('Ece/V', '<f4'), 9: ("Ece/V", "<f4"),
11: ('I/mA', '<f8'), 11: ("I/mA", "<f8"),
13: ('(Q-Qo)/mA.h', '<f8'), 13: ("(Q-Qo)/mA.h", "<f8"),
16: ('Analog IN 1/V', '<f4'), 16: ("Analog IN 1/V", "<f4"),
19: ('control/V', '<f4'), 19: ("control/V", "<f4"),
20: ('control/mA', '<f4'), 20: ("control/mA", "<f4"),
23: ('dQ/mA.h', '<f8'), # Same as 7? 23: ("dQ/mA.h", "<f8"), # Same as 7?
24: ('cycle number', '<f8'), 24: ("cycle number", "<f8"),
26: ('Rapp/Ohm', '<f4'), 26: ("Rapp/Ohm", "<f4"),
32: ('freq/Hz', '<f4'), 32: ("freq/Hz", "<f4"),
33: ('|Ewe|/V', '<f4'), 33: ("|Ewe|/V", "<f4"),
34: ('|I|/A', '<f4'), 34: ("|I|/A", "<f4"),
35: ('Phase(Z)/deg', '<f4'), 35: ("Phase(Z)/deg", "<f4"),
36: ('|Z|/Ohm', '<f4'), 36: ("|Z|/Ohm", "<f4"),
37: ('Re(Z)/Ohm', '<f4'), 37: ("Re(Z)/Ohm", "<f4"),
38: ('-Im(Z)/Ohm', '<f4'), 38: ("-Im(Z)/Ohm", "<f4"),
39: ('I Range', '<u2'), 39: ("I Range", "<u2"),
69: ('R/Ohm', '<f4'), 69: ("R/Ohm", "<f4"),
70: ('P/W', '<f4'), 70: ("P/W", "<f4"),
74: ('Energy/W.h', '<f8'), 74: ("Energy/W.h", "<f8"),
75: ('Analog OUT/V', '<f4'), 75: ("Analog OUT/V", "<f4"),
76: ('<I>/mA', '<f4'), 76: ("<I>/mA", "<f4"),
77: ('<Ewe>/V', '<f4'), 77: ("<Ewe>/V", "<f4"),
78: ('Cs-2/µF-2', '<f4'), 78: ("Cs-2/µF-2", "<f4"),
96: ('|Ece|/V', '<f4'), 96: ("|Ece|/V", "<f4"),
98: ('Phase(Zce)/deg', '<f4'), 98: ("Phase(Zce)/deg", "<f4"),
99: ('|Zce|/Ohm', '<f4'), 99: ("|Zce|/Ohm", "<f4"),
100: ('Re(Zce)/Ohm', '<f4'), 100: ("Re(Zce)/Ohm", "<f4"),
101: ('-Im(Zce)/Ohm', '<f4'), 101: ("-Im(Zce)/Ohm", "<f4"),
123: ('Energy charge/W.h', '<f8'), 123: ("Energy charge/W.h", "<f8"),
124: ('Energy discharge/W.h', '<f8'), 124: ("Energy discharge/W.h", "<f8"),
125: ('Capacitance charge/µF', '<f8'), 125: ("Capacitance charge/µF", "<f8"),
126: ('Capacitance discharge/µF', '<f8'), 126: ("Capacitance discharge/µF", "<f8"),
131: ('Ns', '<u2'), 131: ("Ns", "<u2"),
163: ('|Estack|/V', '<f4'), 163: ("|Estack|/V", "<f4"),
168: ('Rcmp/Ohm', '<f4'), 168: ("Rcmp/Ohm", "<f4"),
169: ('Cs/µF', '<f4'), 169: ("Cs/µF", "<f4"),
172: ('Cp/µF', '<f4'), 172: ("Cp/µF", "<f4"),
173: ('Cp-2/µF-2', '<f4'), 173: ("Cp-2/µF-2", "<f4"),
174: ('Ewe/V', '<f4'), 174: ("Ewe/V", "<f4"),
241: ('|E1|/V', '<f4'), 241: ("|E1|/V", "<f4"),
242: ('|E2|/V', '<f4'), 242: ("|E2|/V", "<f4"),
271: ('Phase(Z1) / deg', '<f4'), 271: ("Phase(Z1) / deg", "<f4"),
272: ('Phase(Z2) / deg', '<f4'), 272: ("Phase(Z2) / deg", "<f4"),
301: ('|Z1|/Ohm', '<f4'), 301: ("|Z1|/Ohm", "<f4"),
302: ('|Z2|/Ohm', '<f4'), 302: ("|Z2|/Ohm", "<f4"),
331: ('Re(Z1)/Ohm', '<f4'), 331: ("Re(Z1)/Ohm", "<f4"),
332: ('Re(Z2)/Ohm', '<f4'), 332: ("Re(Z2)/Ohm", "<f4"),
361: ('-Im(Z1)/Ohm', '<f4'), 361: ("-Im(Z1)/Ohm", "<f4"),
362: ('-Im(Z2)/Ohm', '<f4'), 362: ("-Im(Z2)/Ohm", "<f4"),
391: ('<E1>/V', '<f4'), 391: ("<E1>/V", "<f4"),
392: ('<E2>/V', '<f4'), 392: ("<E2>/V", "<f4"),
422: ('Phase(Zstack)/deg', '<f4'), 422: ("Phase(Zstack)/deg", "<f4"),
423: ('|Zstack|/Ohm', '<f4'), 423: ("|Zstack|/Ohm", "<f4"),
424: ('Re(Zstack)/Ohm', '<f4'), 424: ("Re(Zstack)/Ohm", "<f4"),
425: ('-Im(Zstack)/Ohm', '<f4'), 425: ("-Im(Zstack)/Ohm", "<f4"),
426: ('<Estack>/V', '<f4'), 426: ("<Estack>/V", "<f4"),
430: ('Phase(Zwe-ce)/deg', '<f4'), 430: ("Phase(Zwe-ce)/deg", "<f4"),
431: ('|Zwe-ce|/Ohm', '<f4'), 431: ("|Zwe-ce|/Ohm", "<f4"),
432: ('Re(Zwe-ce)/Ohm', '<f4'), 432: ("Re(Zwe-ce)/Ohm", "<f4"),
433: ('-Im(Zwe-ce)/Ohm', '<f4'), 433: ("-Im(Zwe-ce)/Ohm", "<f4"),
434: ('(Q-Qo)/C', '<f4'), 434: ("(Q-Qo)/C", "<f4"),
435: ('dQ/C', '<f4'), 435: ("dQ/C", "<f4"),
438: ('step time/s', '<f8'), 438: ("step time/s", "<f8"),
441: ('<Ecv>/V', '<f4'), 441: ("<Ecv>/V", "<f4"),
462: ('Temperature/°C', '<f4'), 462: ("Temperature/°C", "<f4"),
467: ('Q charge/discharge/mA.h', '<f8'), 467: ("Q charge/discharge/mA.h", "<f8"),
468: ('half cycle', '<u4'), 468: ("half cycle", "<u4"),
469: ('z cycle', '<u4'), 469: ("z cycle", "<u4"),
471: ('<Ece>/V', '<f4'), 471: ("<Ece>/V", "<f4"),
473: ('THD Ewe/%', '<f4'), 473: ("THD Ewe/%", "<f4"),
474: ('THD I/%', '<f4'), 474: ("THD I/%", "<f4"),
476: ('NSD Ewe/%', '<f4'), 476: ("NSD Ewe/%", "<f4"),
477: ('NSD I/%', '<f4'), 477: ("NSD I/%", "<f4"),
479: ('NSR Ewe/%', '<f4'), 479: ("NSR Ewe/%", "<f4"),
480: ('NSR I/%', '<f4'), 480: ("NSR I/%", "<f4"),
486: ('|Ewe h2|/V', '<f4'), 486: ("|Ewe h2|/V", "<f4"),
487: ('|Ewe h3|/V', '<f4'), 487: ("|Ewe h3|/V", "<f4"),
488: ('|Ewe h4|/V', '<f4'), 488: ("|Ewe h4|/V", "<f4"),
489: ('|Ewe h5|/V', '<f4'), 489: ("|Ewe h5|/V", "<f4"),
490: ('|Ewe h6|/V', '<f4'), 490: ("|Ewe h6|/V", "<f4"),
491: ('|Ewe h7|/V', '<f4'), 491: ("|Ewe h7|/V", "<f4"),
492: ('|I h2|/A', '<f4'), 492: ("|I h2|/A", "<f4"),
493: ('|I h3|/A', '<f4'), 493: ("|I h3|/A", "<f4"),
494: ('|I h4|/A', '<f4'), 494: ("|I h4|/A", "<f4"),
495: ('|I h5|/A', '<f4'), 495: ("|I h5|/A", "<f4"),
496: ('|I h6|/A', '<f4'), 496: ("|I h6|/A", "<f4"),
497: ('|I h7|/A', '<f4'), 497: ("|I h7|/A", "<f4"),
498: ('Q charge/mA.h', '<f8'), 498: ("Q charge/mA.h", "<f8"),
499: ('Q discharge/mA.h', '<f8'), 499: ("Q discharge/mA.h", "<f8"),
500: ('step time/s', '<f8'), 500: ("step time/s", "<f8"),
501: ('Efficiency/%', '<f8'), 501: ("Efficiency/%", "<f8"),
502: ('Capacity/mA.h', '<f8'), 502: ("Capacity/mA.h", "<f8"),
505: ('Rdc/Ohm', '<f4'), 505: ("Rdc/Ohm", "<f4"),
509: ('Acir/Dcir Control', '<u1'), 509: ("Acir/Dcir Control", "<u1"),
} }
# These column IDs define flags which are all stored packed in a single byte # These column IDs define flags which are all stored packed in a single byte
# The values in the map are (name, bitmask, dtype) # The values in the map are (name, bitmask, dtype)
VMPdata_colID_flag_map = { VMPdata_colID_flag_map = {
1: ('mode', 0x03, np.uint8), 1: ("mode", 0x03, np.uint8),
2: ('ox/red', 0x04, np.bool_), 2: ("ox/red", 0x04, np.bool_),
3: ('error', 0x08, np.bool_), 3: ("error", 0x08, np.bool_),
21: ('control changes', 0x10, np.bool_), 21: ("control changes", 0x10, np.bool_),
31: ('Ns changes', 0x20, np.bool_), 31: ("Ns changes", 0x20, np.bool_),
65: ('counter inc.', 0x80, np.bool_), 65: ("counter inc.", 0x80, np.bool_),
} }
def parse_BioLogic_date(date_text): def parse_BioLogic_date(date_text):
"""Parse a date from one of the various formats used by Bio-Logic files.""" """Parse a date from one of the various formats used by Bio-Logic files."""
date_formats = ['%m/%d/%y', '%m-%d-%y', '%m.%d.%y'] date_formats = ["%m/%d/%y", "%m-%d-%y", "%m.%d.%y"]
if isinstance(date_text, bytes): if isinstance(date_text, bytes):
date_string = date_text.decode('ascii') date_string = date_text.decode("ascii")
else: else:
date_string = date_text date_string = date_text
for date_format in date_formats: for date_format in date_formats:
@@ -281,8 +371,10 @@ def parse_BioLogic_date(date_text):
else: else:
break break
else: else:
raise ValueError(f'Could not parse timestamp {date_string!r}' raise ValueError(
f' with any of the formats {date_formats}') f"Could not parse timestamp {date_string!r}"
f" with any of the formats {date_formats}"
)
return date(tm.tm_year, tm.tm_mon, tm.tm_mday) return date(tm.tm_year, tm.tm_mon, tm.tm_mday)
@@ -309,9 +401,9 @@ def VMPdata_dtype_from_colIDs(colIDs):
# in the overall record is determined by the position of the first # in the overall record is determined by the position of the first
# column ID of flag type. If there are several flags present, # column ID of flag type. If there are several flags present,
# there is still only one 'flags' int # there is still only one 'flags' int
if 'flags' not in field_name_counts: if "flags" not in field_name_counts:
type_list.append(('flags', 'u1')) type_list.append(("flags", "u1"))
field_name_counts['flags'] = 1 field_name_counts["flags"] = 1
flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID] flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID]
# TODO what happens if a flag colID has already been seen # TODO what happens if a flag colID has already been seen
# i.e. if flag_name is already present in flags_dict? # i.e. if flag_name is already present in flags_dict?
@@ -322,15 +414,15 @@ def VMPdata_dtype_from_colIDs(colIDs):
field_name_counts[field_name] += 1 field_name_counts[field_name] += 1
count = field_name_counts[field_name] count = field_name_counts[field_name]
if count > 1: if count > 1:
unique_field_name = '%s %d' % (field_name, count) unique_field_name = "%s %d" % (field_name, count)
else: else:
unique_field_name = field_name unique_field_name = field_name
type_list.append((unique_field_name, field_type)) type_list.append((unique_field_name, field_type))
else: else:
raise NotImplementedError("Column ID {cid} after column {prev} " raise NotImplementedError(
"is unknown" "Column ID {cid} after column {prev} "
.format(cid=colID, "is unknown".format(cid=colID, prev=type_list[-1][0])
prev=type_list[-1][0])) )
return np.dtype(type_list), flags_dict return np.dtype(type_list), flags_dict
@@ -341,12 +433,13 @@ def read_VMP_modules(fileobj, read_module_data=True):
N.B. the offset yielded is the offset to the start of the data i.e. after N.B. the offset yielded is the offset to the start of the data i.e. after
the end of the header. The data runs from (offset) to (offset+length)""" the end of the header. The data runs from (offset) to (offset+length)"""
while True: while True:
module_magic = fileobj.read(len(b'MODULE')) module_magic = fileobj.read(len(b"MODULE"))
if len(module_magic) == 0: # end of file if len(module_magic) == 0: # end of file
break break
elif module_magic != b'MODULE': elif module_magic != b"MODULE":
raise ValueError("Found %r, expecting start of new VMP MODULE" raise ValueError(
% module_magic) "Found %r, expecting start of new VMP MODULE" % module_magic
)
hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize)
if len(hdr_bytes) < VMPmodule_hdr.itemsize: if len(hdr_bytes) < VMPmodule_hdr.itemsize:
@@ -354,23 +447,24 @@ def read_VMP_modules(fileobj, read_module_data=True):
hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1) hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1)
hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names)) hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names))
hdr_dict['offset'] = fileobj.tell() hdr_dict["offset"] = fileobj.tell()
if read_module_data: if read_module_data:
hdr_dict['data'] = fileobj.read(hdr_dict['length']) hdr_dict["data"] = fileobj.read(hdr_dict["length"])
if len(hdr_dict['data']) != hdr_dict['length']: if len(hdr_dict["data"]) != hdr_dict["length"]:
raise IOError("""Unexpected end of file while reading data raise IOError(
"""Unexpected end of file while reading data
current module: %s current module: %s
length read: %d length read: %d
length expected: %d""" % (hdr_dict['longname'], length expected: %d"""
len(hdr_dict['data']), % (hdr_dict["longname"], len(hdr_dict["data"]), hdr_dict["length"])
hdr_dict['length'])) )
yield hdr_dict yield hdr_dict
else: else:
yield hdr_dict yield hdr_dict
fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET) fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET)
MPR_MAGIC = b'BIO-LOGIC MODULAR FILE\x1a'.ljust(48) + b'\x00\x00\x00\x00' MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00"
class MPRfile: class MPRfile:
@@ -392,41 +486,44 @@ class MPRfile:
def __init__(self, file_or_path): def __init__(self, file_or_path):
self.loop_index = None self.loop_index = None
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpr_file = open(file_or_path, 'rb') mpr_file = open(file_or_path, "rb")
else: else:
mpr_file = file_or_path mpr_file = file_or_path
magic = mpr_file.read(len(MPR_MAGIC)) magic = mpr_file.read(len(MPR_MAGIC))
if magic != MPR_MAGIC: if magic != MPR_MAGIC:
raise ValueError('Invalid magic for .mpr file: %s' % magic) raise ValueError("Invalid magic for .mpr file: %s" % magic)
modules = list(read_VMP_modules(mpr_file)) modules = list(read_VMP_modules(mpr_file))
self.modules = modules self.modules = modules
settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ') (settings_mod,) = (m for m in modules if m["shortname"] == b"VMP Set ")
data_module, = (m for m in modules if m['shortname'] == b'VMP data ') (data_module,) = (m for m in modules if m["shortname"] == b"VMP data ")
maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop '] maybe_loop_module = [m for m in modules if m["shortname"] == b"VMP loop "]
maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG '] maybe_log_module = [m for m in modules if m["shortname"] == b"VMP LOG "]
n_data_points = np.frombuffer(data_module['data'][:4], dtype='<u4') n_data_points = np.frombuffer(data_module["data"][:4], dtype="<u4")
n_columns = np.frombuffer(data_module['data'][4:5], dtype='u1').item() n_columns = np.frombuffer(data_module["data"][4:5], dtype="u1").item()
if data_module['version'] == 0: if data_module["version"] == 0:
column_types = np.frombuffer(data_module['data'][5:], dtype='u1', column_types = np.frombuffer(
count=n_columns) data_module["data"][5:], dtype="u1", count=n_columns
remaining_headers = data_module['data'][5 + n_columns:100] )
main_data = data_module['data'][100:] remaining_headers = data_module["data"][5 + n_columns : 100]
elif data_module['version'] in [2, 3]: main_data = data_module["data"][100:]
column_types = np.frombuffer(data_module['data'][5:], dtype='<u2', elif data_module["version"] in [2, 3]:
count=n_columns) column_types = np.frombuffer(
data_module["data"][5:], dtype="<u2", count=n_columns
)
# There are bytes of data before the main array starts # There are bytes of data before the main array starts
if data_module['version'] == 3: if data_module["version"] == 3:
num_bytes_before = 406 # version 3 added `\x01` to the start num_bytes_before = 406 # version 3 added `\x01` to the start
else: else:
num_bytes_before = 405 num_bytes_before = 405
remaining_headers = data_module['data'][5 + 2 * n_columns:405] remaining_headers = data_module["data"][5 + 2 * n_columns : 405]
main_data = data_module['data'][num_bytes_before:] main_data = data_module["data"][num_bytes_before:]
else: else:
raise ValueError("Unrecognised version for data module: %d" % raise ValueError(
data_module['version']) "Unrecognised version for data module: %d" % data_module["version"]
)
assert not any(remaining_headers) assert not any(remaining_headers)
@@ -436,36 +533,40 @@ class MPRfile:
# No idea what these 'column types' mean or even if they are actually # No idea what these 'column types' mean or even if they are actually
# column types at all # column types at all
self.version = int(data_module['version']) self.version = int(data_module["version"])
self.cols = column_types self.cols = column_types
self.npts = n_data_points self.npts = n_data_points
self.startdate = parse_BioLogic_date(settings_mod['date']) self.startdate = parse_BioLogic_date(settings_mod["date"])
if maybe_loop_module: if maybe_loop_module:
loop_module, = maybe_loop_module (loop_module,) = maybe_loop_module
if loop_module['version'] == 0: if loop_module["version"] == 0:
self.loop_index = np.fromstring(loop_module['data'][4:], self.loop_index = np.fromstring(loop_module["data"][4:], dtype="<u4")
dtype='<u4') self.loop_index = np.trim_zeros(self.loop_index, "b")
self.loop_index = np.trim_zeros(self.loop_index, 'b')
else: else:
raise ValueError("Unrecognised version for data module: %d" % raise ValueError(
data_module['version']) "Unrecognised version for data module: %d" % data_module["version"]
)
if maybe_log_module: if maybe_log_module:
log_module, = maybe_log_module (log_module,) = maybe_log_module
self.enddate = parse_BioLogic_date(log_module['date']) self.enddate = parse_BioLogic_date(log_module["date"])
# There is a timestamp at either 465 or 469 bytes # There is a timestamp at either 465 or 469 bytes
# I can't find any reason why it is one or the other in any # I can't find any reason why it is one or the other in any
# given file # given file
ole_timestamp1 = np.frombuffer(log_module['data'][465:], ole_timestamp1 = np.frombuffer(
dtype='<f8', count=1) log_module["data"][465:], dtype="<f8", count=1
ole_timestamp2 = np.frombuffer(log_module['data'][469:], )
dtype='<f8', count=1) ole_timestamp2 = np.frombuffer(
ole_timestamp3 = np.frombuffer(log_module['data'][473:], log_module["data"][469:], dtype="<f8", count=1
dtype='<f8', count=1) )
ole_timestamp4 = np.frombuffer(log_module['data'][585:], ole_timestamp3 = np.frombuffer(
dtype='<f8', count=1) log_module["data"][473:], dtype="<f8", count=1
)
ole_timestamp4 = np.frombuffer(
log_module["data"][585:], dtype="<f8", count=1
)
if ole_timestamp1 > 40000 and ole_timestamp1 < 50000: if ole_timestamp1 > 40000 and ole_timestamp1 < 50000:
ole_timestamp = ole_timestamp1 ole_timestamp = ole_timestamp1
@@ -483,14 +584,16 @@ class MPRfile:
ole_timedelta = timedelta(days=ole_timestamp[0]) ole_timedelta = timedelta(days=ole_timestamp[0])
self.timestamp = ole_base + ole_timedelta self.timestamp = ole_base + ole_timedelta
if self.startdate != self.timestamp.date(): if self.startdate != self.timestamp.date():
raise ValueError("Date mismatch:\n" raise ValueError(
"Date mismatch:\n"
+ " Start date: %s\n" % self.startdate + " Start date: %s\n" % self.startdate
+ " End date: %s\n" % self.enddate + " End date: %s\n" % self.enddate
+ " Timestamp: %s\n" % self.timestamp) + " Timestamp: %s\n" % self.timestamp
)
def get_flag(self, flagname): def get_flag(self, flagname):
if flagname in self.flags_dict: if flagname in self.flags_dict:
mask, dtype = self.flags_dict[flagname] mask, dtype = self.flags_dict[flagname]
return np.array(self.data['flags'] & mask, dtype=dtype) return np.array(self.data["flags"] & mask, dtype=dtype)
else: else:
raise AttributeError("Flag '%s' not present" % flagname) raise AttributeError("Flag '%s' not present" % flagname)