Merge pull request #96 from chatcannon/black

Format with black
This commit is contained in:
2024-01-20 22:20:41 +02:00
committed by GitHub
7 changed files with 586 additions and 412 deletions

View File

@@ -5,7 +5,7 @@
# #
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
__all__ = ['MPTfileCSV', 'MPTfile'] __all__ = ["MPTfileCSV", "MPTfile"]
import re import re
import csv import csv
@@ -21,19 +21,44 @@ def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of """Converts a column header from the MPT file into a tuple of
canonical name and appropriate numpy dtype""" canonical name and appropriate numpy dtype"""
if fieldname == 'mode': if fieldname == "mode":
return ('mode', np.uint8) return ("mode", np.uint8)
elif fieldname in ("ox/red", "error", "control changes", "Ns changes", elif fieldname in (
"counter inc."): "ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
):
return (fieldname, np.bool_) return (fieldname, np.bool_)
elif fieldname in ("time/s", "P/W", "(Q-Qo)/mA.h", "x", "control/V", elif fieldname in (
"control/mA", "control/V/mA", "(Q-Qo)/C", "dQ/C", "time/s",
"freq/Hz", "|Ewe|/V", "|I|/A", "Phase(Z)/deg", "P/W",
"|Z|/Ohm", "Re(Z)/Ohm", "-Im(Z)/Ohm"): "(Q-Qo)/mA.h",
"x",
"control/V",
"control/mA",
"control/V/mA",
"(Q-Qo)/C",
"dQ/C",
"freq/Hz",
"|Ewe|/V",
"|I|/A",
"Phase(Z)/deg",
"|Z|/Ohm",
"Re(Z)/Ohm",
"-Im(Z)/Ohm",
):
return (fieldname, np.float_) return (fieldname, np.float_)
elif fieldname in ("Q charge/discharge/mA.h", "step time/s", elif fieldname in (
"Q charge/mA.h", "Q discharge/mA.h", "Q charge/discharge/mA.h",
"Temperature/°C", "Efficiency/%", "Capacity/mA.h"): "step time/s",
"Q charge/mA.h",
"Q discharge/mA.h",
"Temperature/°C",
"Efficiency/%",
"Capacity/mA.h",
):
return (fieldname, np.float_) return (fieldname, np.float_)
elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"): elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"):
return (fieldname, np.int_) return (fieldname, np.int_)
@@ -43,12 +68,28 @@ def fieldname_to_dtype(fieldname):
return ("I/mA", np.float_) return ("I/mA", np.float_)
elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V"): elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V"):
return ("Ewe/V", np.float_) return ("Ewe/V", np.float_)
elif fieldname.endswith(("/s", "/Hz", "/deg", elif fieldname.endswith(
"/W", "/mW", "/W.h", "/mW.h", (
"/A", "/mA", "/A.h", "/mA.h", "/s",
"/V", "/mV", "/Hz",
"/F", "/mF", "/uF", "/deg",
"/C", "/Ohm",)): "/W",
"/mW",
"/W.h",
"/mW.h",
"/A",
"/mA",
"/A.h",
"/mA.h",
"/V",
"/mV",
"/F",
"/mF",
"/uF",
"/C",
"/Ohm",
)
):
return (fieldname, np.float_) return (fieldname, np.float_)
else: else:
raise ValueError("Invalid column header: %s" % fieldname) raise ValueError("Invalid column header: %s" % fieldname)
@@ -56,11 +97,11 @@ def fieldname_to_dtype(fieldname):
def comma_converter(float_text): def comma_converter(float_text):
"""Convert text to float whether the decimal point is '.' or ','""" """Convert text to float whether the decimal point is '.' or ','"""
trans_table = bytes.maketrans(b',', b'.') trans_table = bytes.maketrans(b",", b".")
return float(float_text.translate(trans_table)) return float(float_text.translate(trans_table))
def MPTfile(file_or_path, encoding='ascii'): def MPTfile(file_or_path, encoding="ascii"):
"""Opens .mpt files as numpy record arrays """Opens .mpt files as numpy record arrays
Checks for the correct headings, skips any comments and returns a Checks for the correct headings, skips any comments and returns a
@@ -68,16 +109,15 @@ def MPTfile(file_or_path, encoding='ascii'):
""" """
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpt_file = open(file_or_path, 'rb') mpt_file = open(file_or_path, "rb")
else: else:
mpt_file = file_or_path mpt_file = file_or_path
magic = next(mpt_file) magic = next(mpt_file)
if magic not in (b'EC-Lab ASCII FILE\r\n', b'BT-Lab ASCII FILE\r\n'): if magic not in (b"EC-Lab ASCII FILE\r\n", b"BT-Lab ASCII FILE\r\n"):
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$', nb_headers_match = re.match(rb"Nb header lines : (\d+)\s*$", next(mpt_file))
next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
@@ -86,14 +126,12 @@ def MPTfile(file_or_path, encoding='ascii'):
# make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
fieldnames = next(mpt_file).decode(encoding).strip().split('\t') fieldnames = next(mpt_file).decode(encoding).strip().split("\t")
record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames))) record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames)))
# Must be able to parse files where commas are used for decimal points # Must be able to parse files where commas are used for decimal points
converter_dict = dict(((i, comma_converter) converter_dict = dict(((i, comma_converter) for i in range(len(fieldnames))))
for i in range(len(fieldnames)))) mpt_array = np.loadtxt(mpt_file, dtype=record_type, converters=converter_dict)
mpt_array = np.loadtxt(mpt_file, dtype=record_type,
converters=converter_dict)
return mpt_array, comments return mpt_array, comments
@@ -106,15 +144,15 @@ def MPTfileCSV(file_or_path):
""" """
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpt_file = open(file_or_path, 'r') mpt_file = open(file_or_path, "r")
else: else:
mpt_file = file_or_path mpt_file = file_or_path
magic = next(mpt_file) magic = next(mpt_file)
if magic.rstrip() != 'EC-Lab ASCII FILE': if magic.rstrip() != "EC-Lab ASCII FILE":
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
nb_headers_match = re.match(r'Nb header lines : (\d+)\s*$', next(mpt_file)) nb_headers_match = re.match(r"Nb header lines : (\d+)\s*$", next(mpt_file))
nb_headers = int(nb_headers_match.group(1)) nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3: if nb_headers < 3:
raise ValueError("Too few header lines: %d" % nb_headers) raise ValueError("Too few header lines: %d" % nb_headers)
@@ -123,154 +161,206 @@ def MPTfileCSV(file_or_path):
# make three lines. Every additional line is a comment line. # make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)] comments = [next(mpt_file) for i in range(nb_headers - 3)]
mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab') mpt_csv = csv.DictReader(mpt_file, dialect="excel-tab")
expected_fieldnames = ( expected_fieldnames = (
["mode", "ox/red", "error", "control changes", "Ns changes", [
"counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", "mode",
"P/W", "<I>/mA", "(Q-Qo)/mA.h", "x"], "ox/red",
['mode', 'ox/red', 'error', 'control changes', 'Ns changes', "error",
'counter inc.', 'time/s', 'control/V', 'Ewe/V', 'dq/mA.h', "control changes",
'<I>/mA', '(Q-Qo)/mA.h', 'x'], "Ns changes",
["mode", "ox/red", "error", "control changes", "Ns changes", "counter inc.",
"counter inc.", "time/s", "control/V", "Ewe/V", "I/mA", "time/s",
"dQ/mA.h", "P/W"], "control/V/mA",
["mode", "ox/red", "error", "control changes", "Ns changes", "Ewe/V",
"counter inc.", "time/s", "control/V", "Ewe/V", "<I>/mA", "dq/mA.h",
"dQ/mA.h", "P/W"]) "P/W",
"<I>/mA",
"(Q-Qo)/mA.h",
"x",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"dq/mA.h",
"<I>/mA",
"(Q-Qo)/mA.h",
"x",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"I/mA",
"dQ/mA.h",
"P/W",
],
[
"mode",
"ox/red",
"error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V",
"Ewe/V",
"<I>/mA",
"dQ/mA.h",
"P/W",
],
)
if mpt_csv.fieldnames not in expected_fieldnames: if mpt_csv.fieldnames not in expected_fieldnames:
raise ValueError("Unrecognised headers for MPT file format") raise ValueError("Unrecognised headers for MPT file format")
return mpt_csv, comments return mpt_csv, comments
VMPmodule_hdr = np.dtype([('shortname', 'S10'), VMPmodule_hdr = np.dtype(
('longname', 'S25'), [
('length', '<u4'), ("shortname", "S10"),
('version', '<u4'), ("longname", "S25"),
('date', 'S8')]) ("length", "<u4"),
("version", "<u4"),
("date", "S8"),
]
)
# Maps from colID to a tuple defining a numpy dtype # Maps from colID to a tuple defining a numpy dtype
VMPdata_colID_dtype_map = { VMPdata_colID_dtype_map = {
4: ('time/s', '<f8'), 4: ("time/s", "<f8"),
5: ('control/V/mA', '<f4'), 5: ("control/V/mA", "<f4"),
6: ('Ewe/V', '<f4'), 6: ("Ewe/V", "<f4"),
7: ('dQ/mA.h', '<f8'), 7: ("dQ/mA.h", "<f8"),
8: ('I/mA', '<f4'), # 8 is either I or <I> ?? 8: ("I/mA", "<f4"), # 8 is either I or <I> ??
9: ('Ece/V', '<f4'), 9: ("Ece/V", "<f4"),
11: ('I/mA', '<f8'), 11: ("I/mA", "<f8"),
13: ('(Q-Qo)/mA.h', '<f8'), 13: ("(Q-Qo)/mA.h", "<f8"),
16: ('Analog IN 1/V', '<f4'), 16: ("Analog IN 1/V", "<f4"),
19: ('control/V', '<f4'), 19: ("control/V", "<f4"),
20: ('control/mA', '<f4'), 20: ("control/mA", "<f4"),
23: ('dQ/mA.h', '<f8'), # Same as 7? 23: ("dQ/mA.h", "<f8"), # Same as 7?
24: ('cycle number', '<f8'), 24: ("cycle number", "<f8"),
26: ('Rapp/Ohm', '<f4'), 26: ("Rapp/Ohm", "<f4"),
32: ('freq/Hz', '<f4'), 32: ("freq/Hz", "<f4"),
33: ('|Ewe|/V', '<f4'), 33: ("|Ewe|/V", "<f4"),
34: ('|I|/A', '<f4'), 34: ("|I|/A", "<f4"),
35: ('Phase(Z)/deg', '<f4'), 35: ("Phase(Z)/deg", "<f4"),
36: ('|Z|/Ohm', '<f4'), 36: ("|Z|/Ohm", "<f4"),
37: ('Re(Z)/Ohm', '<f4'), 37: ("Re(Z)/Ohm", "<f4"),
38: ('-Im(Z)/Ohm', '<f4'), 38: ("-Im(Z)/Ohm", "<f4"),
39: ('I Range', '<u2'), 39: ("I Range", "<u2"),
69: ('R/Ohm', '<f4'), 69: ("R/Ohm", "<f4"),
70: ('P/W', '<f4'), 70: ("P/W", "<f4"),
74: ('Energy/W.h', '<f8'), 74: ("Energy/W.h", "<f8"),
75: ('Analog OUT/V', '<f4'), 75: ("Analog OUT/V", "<f4"),
76: ('<I>/mA', '<f4'), 76: ("<I>/mA", "<f4"),
77: ('<Ewe>/V', '<f4'), 77: ("<Ewe>/V", "<f4"),
78: ('Cs-2/µF-2', '<f4'), 78: ("Cs-2/µF-2", "<f4"),
96: ('|Ece|/V', '<f4'), 96: ("|Ece|/V", "<f4"),
98: ('Phase(Zce)/deg', '<f4'), 98: ("Phase(Zce)/deg", "<f4"),
99: ('|Zce|/Ohm', '<f4'), 99: ("|Zce|/Ohm", "<f4"),
100: ('Re(Zce)/Ohm', '<f4'), 100: ("Re(Zce)/Ohm", "<f4"),
101: ('-Im(Zce)/Ohm', '<f4'), 101: ("-Im(Zce)/Ohm", "<f4"),
123: ('Energy charge/W.h', '<f8'), 123: ("Energy charge/W.h", "<f8"),
124: ('Energy discharge/W.h', '<f8'), 124: ("Energy discharge/W.h", "<f8"),
125: ('Capacitance charge/µF', '<f8'), 125: ("Capacitance charge/µF", "<f8"),
126: ('Capacitance discharge/µF', '<f8'), 126: ("Capacitance discharge/µF", "<f8"),
131: ('Ns', '<u2'), 131: ("Ns", "<u2"),
163: ('|Estack|/V', '<f4'), 163: ("|Estack|/V", "<f4"),
168: ('Rcmp/Ohm', '<f4'), 168: ("Rcmp/Ohm", "<f4"),
169: ('Cs/µF', '<f4'), 169: ("Cs/µF", "<f4"),
172: ('Cp/µF', '<f4'), 172: ("Cp/µF", "<f4"),
173: ('Cp-2/µF-2', '<f4'), 173: ("Cp-2/µF-2", "<f4"),
174: ('Ewe/V', '<f4'), 174: ("Ewe/V", "<f4"),
241: ('|E1|/V', '<f4'), 241: ("|E1|/V", "<f4"),
242: ('|E2|/V', '<f4'), 242: ("|E2|/V", "<f4"),
271: ('Phase(Z1) / deg', '<f4'), 271: ("Phase(Z1) / deg", "<f4"),
272: ('Phase(Z2) / deg', '<f4'), 272: ("Phase(Z2) / deg", "<f4"),
301: ('|Z1|/Ohm', '<f4'), 301: ("|Z1|/Ohm", "<f4"),
302: ('|Z2|/Ohm', '<f4'), 302: ("|Z2|/Ohm", "<f4"),
331: ('Re(Z1)/Ohm', '<f4'), 331: ("Re(Z1)/Ohm", "<f4"),
332: ('Re(Z2)/Ohm', '<f4'), 332: ("Re(Z2)/Ohm", "<f4"),
361: ('-Im(Z1)/Ohm', '<f4'), 361: ("-Im(Z1)/Ohm", "<f4"),
362: ('-Im(Z2)/Ohm', '<f4'), 362: ("-Im(Z2)/Ohm", "<f4"),
391: ('<E1>/V', '<f4'), 391: ("<E1>/V", "<f4"),
392: ('<E2>/V', '<f4'), 392: ("<E2>/V", "<f4"),
422: ('Phase(Zstack)/deg', '<f4'), 422: ("Phase(Zstack)/deg", "<f4"),
423: ('|Zstack|/Ohm', '<f4'), 423: ("|Zstack|/Ohm", "<f4"),
424: ('Re(Zstack)/Ohm', '<f4'), 424: ("Re(Zstack)/Ohm", "<f4"),
425: ('-Im(Zstack)/Ohm', '<f4'), 425: ("-Im(Zstack)/Ohm", "<f4"),
426: ('<Estack>/V', '<f4'), 426: ("<Estack>/V", "<f4"),
430: ('Phase(Zwe-ce)/deg', '<f4'), 430: ("Phase(Zwe-ce)/deg", "<f4"),
431: ('|Zwe-ce|/Ohm', '<f4'), 431: ("|Zwe-ce|/Ohm", "<f4"),
432: ('Re(Zwe-ce)/Ohm', '<f4'), 432: ("Re(Zwe-ce)/Ohm", "<f4"),
433: ('-Im(Zwe-ce)/Ohm', '<f4'), 433: ("-Im(Zwe-ce)/Ohm", "<f4"),
434: ('(Q-Qo)/C', '<f4'), 434: ("(Q-Qo)/C", "<f4"),
435: ('dQ/C', '<f4'), 435: ("dQ/C", "<f4"),
438: ('step time/s', '<f8'), 438: ("step time/s", "<f8"),
441: ('<Ecv>/V', '<f4'), 441: ("<Ecv>/V", "<f4"),
462: ('Temperature/°C', '<f4'), 462: ("Temperature/°C", "<f4"),
467: ('Q charge/discharge/mA.h', '<f8'), 467: ("Q charge/discharge/mA.h", "<f8"),
468: ('half cycle', '<u4'), 468: ("half cycle", "<u4"),
469: ('z cycle', '<u4'), 469: ("z cycle", "<u4"),
471: ('<Ece>/V', '<f4'), 471: ("<Ece>/V", "<f4"),
473: ('THD Ewe/%', '<f4'), 473: ("THD Ewe/%", "<f4"),
474: ('THD I/%', '<f4'), 474: ("THD I/%", "<f4"),
476: ('NSD Ewe/%', '<f4'), 476: ("NSD Ewe/%", "<f4"),
477: ('NSD I/%', '<f4'), 477: ("NSD I/%", "<f4"),
479: ('NSR Ewe/%', '<f4'), 479: ("NSR Ewe/%", "<f4"),
480: ('NSR I/%', '<f4'), 480: ("NSR I/%", "<f4"),
486: ('|Ewe h2|/V', '<f4'), 486: ("|Ewe h2|/V", "<f4"),
487: ('|Ewe h3|/V', '<f4'), 487: ("|Ewe h3|/V", "<f4"),
488: ('|Ewe h4|/V', '<f4'), 488: ("|Ewe h4|/V", "<f4"),
489: ('|Ewe h5|/V', '<f4'), 489: ("|Ewe h5|/V", "<f4"),
490: ('|Ewe h6|/V', '<f4'), 490: ("|Ewe h6|/V", "<f4"),
491: ('|Ewe h7|/V', '<f4'), 491: ("|Ewe h7|/V", "<f4"),
492: ('|I h2|/A', '<f4'), 492: ("|I h2|/A", "<f4"),
493: ('|I h3|/A', '<f4'), 493: ("|I h3|/A", "<f4"),
494: ('|I h4|/A', '<f4'), 494: ("|I h4|/A", "<f4"),
495: ('|I h5|/A', '<f4'), 495: ("|I h5|/A", "<f4"),
496: ('|I h6|/A', '<f4'), 496: ("|I h6|/A", "<f4"),
497: ('|I h7|/A', '<f4'), 497: ("|I h7|/A", "<f4"),
498: ('Q charge/mA.h', '<f8'), 498: ("Q charge/mA.h", "<f8"),
499: ('Q discharge/mA.h', '<f8'), 499: ("Q discharge/mA.h", "<f8"),
500: ('step time/s', '<f8'), 500: ("step time/s", "<f8"),
501: ('Efficiency/%', '<f8'), 501: ("Efficiency/%", "<f8"),
502: ('Capacity/mA.h', '<f8'), 502: ("Capacity/mA.h", "<f8"),
505: ('Rdc/Ohm', '<f4'), 505: ("Rdc/Ohm", "<f4"),
509: ('Acir/Dcir Control', '<u1'), 509: ("Acir/Dcir Control", "<u1"),
} }
# These column IDs define flags which are all stored packed in a single byte # These column IDs define flags which are all stored packed in a single byte
# The values in the map are (name, bitmask, dtype) # The values in the map are (name, bitmask, dtype)
VMPdata_colID_flag_map = { VMPdata_colID_flag_map = {
1: ('mode', 0x03, np.uint8), 1: ("mode", 0x03, np.uint8),
2: ('ox/red', 0x04, np.bool_), 2: ("ox/red", 0x04, np.bool_),
3: ('error', 0x08, np.bool_), 3: ("error", 0x08, np.bool_),
21: ('control changes', 0x10, np.bool_), 21: ("control changes", 0x10, np.bool_),
31: ('Ns changes', 0x20, np.bool_), 31: ("Ns changes", 0x20, np.bool_),
65: ('counter inc.', 0x80, np.bool_), 65: ("counter inc.", 0x80, np.bool_),
} }
def parse_BioLogic_date(date_text): def parse_BioLogic_date(date_text):
"""Parse a date from one of the various formats used by Bio-Logic files.""" """Parse a date from one of the various formats used by Bio-Logic files."""
date_formats = ['%m/%d/%y', '%m-%d-%y', '%m.%d.%y'] date_formats = ["%m/%d/%y", "%m-%d-%y", "%m.%d.%y"]
if isinstance(date_text, bytes): if isinstance(date_text, bytes):
date_string = date_text.decode('ascii') date_string = date_text.decode("ascii")
else: else:
date_string = date_text date_string = date_text
for date_format in date_formats: for date_format in date_formats:
@@ -281,8 +371,10 @@ def parse_BioLogic_date(date_text):
else: else:
break break
else: else:
raise ValueError(f'Could not parse timestamp {date_string!r}' raise ValueError(
f' with any of the formats {date_formats}') f"Could not parse timestamp {date_string!r}"
f" with any of the formats {date_formats}"
)
return date(tm.tm_year, tm.tm_mon, tm.tm_mday) return date(tm.tm_year, tm.tm_mon, tm.tm_mday)
@@ -309,9 +401,9 @@ def VMPdata_dtype_from_colIDs(colIDs):
# in the overall record is determined by the position of the first # in the overall record is determined by the position of the first
# column ID of flag type. If there are several flags present, # column ID of flag type. If there are several flags present,
# there is still only one 'flags' int # there is still only one 'flags' int
if 'flags' not in field_name_counts: if "flags" not in field_name_counts:
type_list.append(('flags', 'u1')) type_list.append(("flags", "u1"))
field_name_counts['flags'] = 1 field_name_counts["flags"] = 1
flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID] flag_name, flag_mask, flag_type = VMPdata_colID_flag_map[colID]
# TODO what happens if a flag colID has already been seen # TODO what happens if a flag colID has already been seen
# i.e. if flag_name is already present in flags_dict? # i.e. if flag_name is already present in flags_dict?
@@ -322,15 +414,15 @@ def VMPdata_dtype_from_colIDs(colIDs):
field_name_counts[field_name] += 1 field_name_counts[field_name] += 1
count = field_name_counts[field_name] count = field_name_counts[field_name]
if count > 1: if count > 1:
unique_field_name = '%s %d' % (field_name, count) unique_field_name = "%s %d" % (field_name, count)
else: else:
unique_field_name = field_name unique_field_name = field_name
type_list.append((unique_field_name, field_type)) type_list.append((unique_field_name, field_type))
else: else:
raise NotImplementedError("Column ID {cid} after column {prev} " raise NotImplementedError(
"is unknown" "Column ID {cid} after column {prev} "
.format(cid=colID, "is unknown".format(cid=colID, prev=type_list[-1][0])
prev=type_list[-1][0])) )
return np.dtype(type_list), flags_dict return np.dtype(type_list), flags_dict
@@ -341,12 +433,13 @@ def read_VMP_modules(fileobj, read_module_data=True):
N.B. the offset yielded is the offset to the start of the data i.e. after N.B. the offset yielded is the offset to the start of the data i.e. after
the end of the header. The data runs from (offset) to (offset+length)""" the end of the header. The data runs from (offset) to (offset+length)"""
while True: while True:
module_magic = fileobj.read(len(b'MODULE')) module_magic = fileobj.read(len(b"MODULE"))
if len(module_magic) == 0: # end of file if len(module_magic) == 0: # end of file
break break
elif module_magic != b'MODULE': elif module_magic != b"MODULE":
raise ValueError("Found %r, expecting start of new VMP MODULE" raise ValueError(
% module_magic) "Found %r, expecting start of new VMP MODULE" % module_magic
)
hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize)
if len(hdr_bytes) < VMPmodule_hdr.itemsize: if len(hdr_bytes) < VMPmodule_hdr.itemsize:
@@ -354,23 +447,24 @@ def read_VMP_modules(fileobj, read_module_data=True):
hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1) hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1)
hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names)) hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names))
hdr_dict['offset'] = fileobj.tell() hdr_dict["offset"] = fileobj.tell()
if read_module_data: if read_module_data:
hdr_dict['data'] = fileobj.read(hdr_dict['length']) hdr_dict["data"] = fileobj.read(hdr_dict["length"])
if len(hdr_dict['data']) != hdr_dict['length']: if len(hdr_dict["data"]) != hdr_dict["length"]:
raise IOError("""Unexpected end of file while reading data raise IOError(
"""Unexpected end of file while reading data
current module: %s current module: %s
length read: %d length read: %d
length expected: %d""" % (hdr_dict['longname'], length expected: %d"""
len(hdr_dict['data']), % (hdr_dict["longname"], len(hdr_dict["data"]), hdr_dict["length"])
hdr_dict['length'])) )
yield hdr_dict yield hdr_dict
else: else:
yield hdr_dict yield hdr_dict
fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET) fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET)
MPR_MAGIC = b'BIO-LOGIC MODULAR FILE\x1a'.ljust(48) + b'\x00\x00\x00\x00' MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00"
class MPRfile: class MPRfile:
@@ -392,41 +486,44 @@ class MPRfile:
def __init__(self, file_or_path): def __init__(self, file_or_path):
self.loop_index = None self.loop_index = None
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpr_file = open(file_or_path, 'rb') mpr_file = open(file_or_path, "rb")
else: else:
mpr_file = file_or_path mpr_file = file_or_path
magic = mpr_file.read(len(MPR_MAGIC)) magic = mpr_file.read(len(MPR_MAGIC))
if magic != MPR_MAGIC: if magic != MPR_MAGIC:
raise ValueError('Invalid magic for .mpr file: %s' % magic) raise ValueError("Invalid magic for .mpr file: %s" % magic)
modules = list(read_VMP_modules(mpr_file)) modules = list(read_VMP_modules(mpr_file))
self.modules = modules self.modules = modules
settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ') (settings_mod,) = (m for m in modules if m["shortname"] == b"VMP Set ")
data_module, = (m for m in modules if m['shortname'] == b'VMP data ') (data_module,) = (m for m in modules if m["shortname"] == b"VMP data ")
maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop '] maybe_loop_module = [m for m in modules if m["shortname"] == b"VMP loop "]
maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG '] maybe_log_module = [m for m in modules if m["shortname"] == b"VMP LOG "]
n_data_points = np.frombuffer(data_module['data'][:4], dtype='<u4') n_data_points = np.frombuffer(data_module["data"][:4], dtype="<u4")
n_columns = np.frombuffer(data_module['data'][4:5], dtype='u1').item() n_columns = np.frombuffer(data_module["data"][4:5], dtype="u1").item()
if data_module['version'] == 0: if data_module["version"] == 0:
column_types = np.frombuffer(data_module['data'][5:], dtype='u1', column_types = np.frombuffer(
count=n_columns) data_module["data"][5:], dtype="u1", count=n_columns
remaining_headers = data_module['data'][5 + n_columns:100] )
main_data = data_module['data'][100:] remaining_headers = data_module["data"][5 + n_columns:100]
elif data_module['version'] in [2, 3]: main_data = data_module["data"][100:]
column_types = np.frombuffer(data_module['data'][5:], dtype='<u2', elif data_module["version"] in [2, 3]:
count=n_columns) column_types = np.frombuffer(
data_module["data"][5:], dtype="<u2", count=n_columns
)
# There are bytes of data before the main array starts # There are bytes of data before the main array starts
if data_module['version'] == 3: if data_module["version"] == 3:
num_bytes_before = 406 # version 3 added `\x01` to the start num_bytes_before = 406 # version 3 added `\x01` to the start
else: else:
num_bytes_before = 405 num_bytes_before = 405
remaining_headers = data_module['data'][5 + 2 * n_columns:405] remaining_headers = data_module["data"][5 + 2 * n_columns:405]
main_data = data_module['data'][num_bytes_before:] main_data = data_module["data"][num_bytes_before:]
else: else:
raise ValueError("Unrecognised version for data module: %d" % raise ValueError(
data_module['version']) "Unrecognised version for data module: %d" % data_module["version"]
)
assert not any(remaining_headers) assert not any(remaining_headers)
@@ -436,36 +533,40 @@ class MPRfile:
# No idea what these 'column types' mean or even if they are actually # No idea what these 'column types' mean or even if they are actually
# column types at all # column types at all
self.version = int(data_module['version']) self.version = int(data_module["version"])
self.cols = column_types self.cols = column_types
self.npts = n_data_points self.npts = n_data_points
self.startdate = parse_BioLogic_date(settings_mod['date']) self.startdate = parse_BioLogic_date(settings_mod["date"])
if maybe_loop_module: if maybe_loop_module:
loop_module, = maybe_loop_module (loop_module,) = maybe_loop_module
if loop_module['version'] == 0: if loop_module["version"] == 0:
self.loop_index = np.fromstring(loop_module['data'][4:], self.loop_index = np.fromstring(loop_module["data"][4:], dtype="<u4")
dtype='<u4') self.loop_index = np.trim_zeros(self.loop_index, "b")
self.loop_index = np.trim_zeros(self.loop_index, 'b')
else: else:
raise ValueError("Unrecognised version for data module: %d" % raise ValueError(
data_module['version']) "Unrecognised version for data module: %d" % data_module["version"]
)
if maybe_log_module: if maybe_log_module:
log_module, = maybe_log_module (log_module,) = maybe_log_module
self.enddate = parse_BioLogic_date(log_module['date']) self.enddate = parse_BioLogic_date(log_module["date"])
# There is a timestamp at either 465 or 469 bytes # There is a timestamp at either 465 or 469 bytes
# I can't find any reason why it is one or the other in any # I can't find any reason why it is one or the other in any
# given file # given file
ole_timestamp1 = np.frombuffer(log_module['data'][465:], ole_timestamp1 = np.frombuffer(
dtype='<f8', count=1) log_module["data"][465:], dtype="<f8", count=1
ole_timestamp2 = np.frombuffer(log_module['data'][469:], )
dtype='<f8', count=1) ole_timestamp2 = np.frombuffer(
ole_timestamp3 = np.frombuffer(log_module['data'][473:], log_module["data"][469:], dtype="<f8", count=1
dtype='<f8', count=1) )
ole_timestamp4 = np.frombuffer(log_module['data'][585:], ole_timestamp3 = np.frombuffer(
dtype='<f8', count=1) log_module["data"][473:], dtype="<f8", count=1
)
ole_timestamp4 = np.frombuffer(
log_module["data"][585:], dtype="<f8", count=1
)
if ole_timestamp1 > 40000 and ole_timestamp1 < 50000: if ole_timestamp1 > 40000 and ole_timestamp1 < 50000:
ole_timestamp = ole_timestamp1 ole_timestamp = ole_timestamp1
@@ -483,14 +584,16 @@ class MPRfile:
ole_timedelta = timedelta(days=ole_timestamp[0]) ole_timedelta = timedelta(days=ole_timestamp[0])
self.timestamp = ole_base + ole_timedelta self.timestamp = ole_base + ole_timedelta
if self.startdate != self.timestamp.date(): if self.startdate != self.timestamp.date():
raise ValueError("Date mismatch:\n" raise ValueError(
"Date mismatch:\n"
+ " Start date: %s\n" % self.startdate + " Start date: %s\n" % self.startdate
+ " End date: %s\n" % self.enddate + " End date: %s\n" % self.enddate
+ " Timestamp: %s\n" % self.timestamp) + " Timestamp: %s\n" % self.timestamp
)
def get_flag(self, flagname): def get_flag(self, flagname):
if flagname in self.flags_dict: if flagname in self.flags_dict:
mask, dtype = self.flags_dict[flagname] mask, dtype = self.flags_dict[flagname]
return np.array(self.data['flags'] & mask, dtype=dtype) return np.array(self.data["flags"] & mask, dtype=dtype)
else: else:
raise AttributeError("Flag '%s' not present" % flagname) raise AttributeError("Flag '%s' not present" % flagname)

View File

@@ -4,4 +4,4 @@
from .BioLogic import MPRfile, MPTfile from .BioLogic import MPRfile, MPTfile
__all__ = ['MPRfile', 'MPTfile'] __all__ = ["MPRfile", "MPTfile"]

View File

@@ -16,43 +16,43 @@ from copy import copy
# $ mdb-schema <result.res> oracle # $ mdb-schema <result.res> oracle
mdb_tables = [ mdb_tables = [
'Version_Table', "Version_Table",
'Global_Table', "Global_Table",
'Resume_Table', "Resume_Table",
'Channel_Normal_Table', "Channel_Normal_Table",
'Channel_Statistic_Table', "Channel_Statistic_Table",
'Auxiliary_Table', "Auxiliary_Table",
'Event_Table', "Event_Table",
'Smart_Battery_Info_Table', "Smart_Battery_Info_Table",
'Smart_Battery_Data_Table', "Smart_Battery_Data_Table",
] ]
mdb_5_23_tables = [ mdb_5_23_tables = [
'MCell_Aci_Data_Table', "MCell_Aci_Data_Table",
'Aux_Global_Data_Table', "Aux_Global_Data_Table",
'Smart_Battery_Clock_Stretch_Table', "Smart_Battery_Clock_Stretch_Table",
] ]
mdb_5_26_tables = [ mdb_5_26_tables = [
'Can_BMS_Info_Table', "Can_BMS_Info_Table",
'Can_BMS_Data_Table', "Can_BMS_Data_Table",
] ]
mdb_tables_text = { mdb_tables_text = {
'Version_Table', "Version_Table",
'Global_Table', "Global_Table",
'Event_Table', "Event_Table",
'Smart_Battery_Info_Table', "Smart_Battery_Info_Table",
'Can_BMS_Info_Table', "Can_BMS_Info_Table",
} }
mdb_tables_numeric = { mdb_tables_numeric = {
'Resume_Table', "Resume_Table",
'Channel_Normal_Table', "Channel_Normal_Table",
'Channel_Statistic_Table', "Channel_Statistic_Table",
'Auxiliary_Table', "Auxiliary_Table",
'Smart_Battery_Data_Table', "Smart_Battery_Data_Table",
'MCell_Aci_Data_Table', "MCell_Aci_Data_Table",
'Aux_Global_Data_Table', "Aux_Global_Data_Table",
'Smart_Battery_Clock_Stretch_Table', "Smart_Battery_Clock_Stretch_Table",
'Can_BMS_Data_Table', "Can_BMS_Data_Table",
} }
mdb_create_scripts = { mdb_create_scripts = {
@@ -271,7 +271,7 @@ CREATE TABLE Smart_Battery_Data_Table
REFERENCES Channel_Normal_Table (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
); """, ); """,
# The following tables are not present in version 1.14, but are in 5.23 # The following tables are not present in version 1.14, but are in 5.23
'MCell_Aci_Data_Table': """ "MCell_Aci_Data_Table": """
CREATE TABLE MCell_Aci_Data_Table CREATE TABLE MCell_Aci_Data_Table
( (
Test_ID INTEGER, Test_ID INTEGER,
@@ -285,7 +285,7 @@ CREATE TABLE MCell_Aci_Data_Table
FOREIGN KEY (Test_ID, Data_Point) FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);""", );""",
'Aux_Global_Data_Table': """ "Aux_Global_Data_Table": """
CREATE TABLE Aux_Global_Data_Table CREATE TABLE Aux_Global_Data_Table
( (
Channel_Index INTEGER, Channel_Index INTEGER,
@@ -295,7 +295,7 @@ CREATE TABLE Aux_Global_Data_Table
Unit TEXT, Unit TEXT,
PRIMARY KEY (Channel_Index, Auxiliary_Index, Data_Type) PRIMARY KEY (Channel_Index, Auxiliary_Index, Data_Type)
);""", );""",
'Smart_Battery_Clock_Stretch_Table': """ "Smart_Battery_Clock_Stretch_Table": """
CREATE TABLE Smart_Battery_Clock_Stretch_Table CREATE TABLE Smart_Battery_Clock_Stretch_Table
( (
Test_ID INTEGER, Test_ID INTEGER,
@@ -344,7 +344,7 @@ CREATE TABLE Smart_Battery_Clock_Stretch_Table
REFERENCES Channel_Normal_Table (Test_ID, Data_Point) REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);""", );""",
# The following tables are not present in version 5.23, but are in 5.26 # The following tables are not present in version 5.23, but are in 5.26
'Can_BMS_Info_Table': """ "Can_BMS_Info_Table": """
CREATE TABLE "Can_BMS_Info_Table" CREATE TABLE "Can_BMS_Info_Table"
( (
Channel_Index INTEGER PRIMARY KEY, Channel_Index INTEGER PRIMARY KEY,
@@ -352,7 +352,7 @@ CREATE TABLE "Can_BMS_Info_Table"
CAN_Configuration TEXT CAN_Configuration TEXT
); );
""", """,
'Can_BMS_Data_Table': """ "Can_BMS_Data_Table": """
CREATE TABLE "Can_BMS_Data_Table" CREATE TABLE "Can_BMS_Data_Table"
( (
Test_ID INTEGER, Test_ID INTEGER,
@@ -371,7 +371,8 @@ mdb_create_indices = {
CREATE UNIQUE INDEX data_point_index ON Channel_Normal_Table (Test_ID, Data_Point); CREATE UNIQUE INDEX data_point_index ON Channel_Normal_Table (Test_ID, Data_Point);
CREATE INDEX voltage_index ON Channel_Normal_Table (Test_ID, Voltage); CREATE INDEX voltage_index ON Channel_Normal_Table (Test_ID, Voltage);
CREATE INDEX test_time_index ON Channel_Normal_Table (Test_ID, Test_Time); CREATE INDEX test_time_index ON Channel_Normal_Table (Test_ID, Test_Time);
"""} """
}
helper_table_script = """ helper_table_script = """
CREATE TEMPORARY TABLE capacity_helper( CREATE TEMPORARY TABLE capacity_helper(
@@ -438,17 +439,19 @@ CREATE VIEW IF NOT EXISTS Capacity_View
def mdb_get_data_text(s3db, filename, table): def mdb_get_data_text(s3db, filename, table):
print("Reading %s..." % table) print("Reading %s..." % table)
insert_pattern = re.compile( insert_pattern = re.compile(
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', re.IGNORECASE
re.IGNORECASE
) )
try: try:
# Initialize values to avoid NameError in except clause # Initialize values to avoid NameError in except clause
mdb_output = '' mdb_output = ""
insert_match = None insert_match = None
with sp.Popen(['mdb-export', '-I', 'postgres', filename, table], with sp.Popen(
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, ["mdb-export", "-I", "postgres", filename, table],
universal_newlines=True) as mdb_sql: bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_output = mdb_sql.stdout.read() mdb_output = mdb_sql.stdout.read()
while len(mdb_output) > 0: while len(mdb_output) > 0:
insert_match = insert_pattern.match(mdb_output) insert_match = insert_pattern.match(mdb_output)
@@ -459,8 +462,10 @@ def mdb_get_data_text(s3db, filename, table):
except OSError as e: except OSError as e:
if e.errno == 2: if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. ' raise RuntimeError(
'Check that mdbtools is properly installed.') "Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else: else:
raise raise
except BaseException: except BaseException:
@@ -475,14 +480,18 @@ def mdb_get_data_text(s3db, filename, table):
def mdb_get_data_numeric(s3db, filename, table): def mdb_get_data_numeric(s3db, filename, table):
print("Reading %s..." % table) print("Reading %s..." % table)
try: try:
with sp.Popen(['mdb-export', filename, table], with sp.Popen(
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, ["mdb-export", filename, table],
universal_newlines=True) as mdb_sql: bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout) mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv) mdb_headers = next(mdb_csv)
quoted_headers = ['"%s"' % h for h in mdb_headers] quoted_headers = ['"%s"' % h for h in mdb_headers]
joined_headers = ', '.join(quoted_headers) joined_headers = ", ".join(quoted_headers)
joined_placemarks = ', '.join(['?' for h in mdb_headers]) joined_placemarks = ", ".join(["?" for h in mdb_headers])
insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format( insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(
table, table,
joined_headers, joined_headers,
@@ -492,8 +501,10 @@ def mdb_get_data_numeric(s3db, filename, table):
s3db.commit() s3db.commit()
except OSError as e: except OSError as e:
if e.errno == 2: if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. ' raise RuntimeError(
'Check that mdbtools is properly installed.') "Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else: else:
raise raise
@@ -504,7 +515,9 @@ def mdb_get_data(s3db, filename, table):
elif table in mdb_tables_numeric: elif table in mdb_tables_numeric:
mdb_get_data_numeric(s3db, filename, table) mdb_get_data_numeric(s3db, filename, table)
else: else:
raise ValueError("'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table) raise ValueError(
"'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table
)
def mdb_get_version(filename): def mdb_get_version(filename):
@@ -514,9 +527,13 @@ def mdb_get_version(filename):
""" """
print("Reading version number...") print("Reading version number...")
try: try:
with sp.Popen(['mdb-export', filename, 'Version_Table'], with sp.Popen(
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE, ["mdb-export", filename, "Version_Table"],
universal_newlines=True) as mdb_sql: bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout) mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv) mdb_headers = next(mdb_csv)
mdb_values = next(mdb_csv) mdb_values = next(mdb_csv)
@@ -525,23 +542,31 @@ def mdb_get_version(filename):
except StopIteration: except StopIteration:
pass pass
else: else:
raise ValueError('Version_Table of %s lists multiple versions' % filename) raise ValueError(
"Version_Table of %s lists multiple versions" % filename
)
except OSError as e: except OSError as e:
if e.errno == 2: if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. ' raise RuntimeError(
'Check that mdbtools is properly installed.') "Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else: else:
raise raise
if 'Version_Schema_Field' not in mdb_headers: if "Version_Schema_Field" not in mdb_headers:
raise ValueError('Version_Table of %s does not contain a Version_Schema_Field column' raise ValueError(
% filename) "Version_Table of %s does not contain a Version_Schema_Field column"
% filename
)
version_fields = dict(zip(mdb_headers, mdb_values)) version_fields = dict(zip(mdb_headers, mdb_values))
version_text = version_fields['Version_Schema_Field'] version_text = version_fields["Version_Schema_Field"]
version_match = re.fullmatch('Results File ([.0-9]+)', version_text) version_match = re.fullmatch("Results File ([.0-9]+)", version_text)
if not version_match: if not version_match:
raise ValueError('File version "%s" did not match expected format' % version_text) raise ValueError(
'File version "%s" did not match expected format' % version_text
)
version_string = version_match.group(1) version_string = version_match.group(1)
version_tuple = tuple(map(int, version_string.split('.'))) version_tuple = tuple(map(int, version_string.split(".")))
return version_tuple return version_tuple
@@ -581,12 +606,14 @@ def main(argv=None):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Convert Arbin .res files to sqlite3 databases using mdb-export", description="Convert Arbin .res files to sqlite3 databases using mdb-export",
) )
parser.add_argument('input_file', type=str) # need file name to pass to sp.Popen parser.add_argument("input_file", type=str) # need file name to pass to sp.Popen
parser.add_argument('output_file', type=str) # need file name to pass to sqlite3.connect parser.add_argument(
"output_file", type=str
) # need file name to pass to sqlite3.connect
args = parser.parse_args(argv) args = parser.parse_args(argv)
convert_arbin_to_sqlite(args.input_file, args.output_file) convert_arbin_to_sqlite(args.input_file, args.output_file)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View File

@@ -7,35 +7,35 @@ import os.path
from setuptools import setup from setuptools import setup
with open(os.path.join(os.path.dirname(__file__), 'README.md')) as f: with open(os.path.join(os.path.dirname(__file__), "README.md")) as f:
readme = f.read() readme = f.read()
setup( setup(
name='galvani', name="galvani",
version='0.2.1', version="0.2.1",
description='Open and process battery charger log data files', description="Open and process battery charger log data files",
long_description=readme, long_description=readme,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
url='https://github.com/echemdata/galvani', url="https://github.com/echemdata/galvani",
author='Chris Kerr', author="Chris Kerr",
author_email='chris.kerr@mykolab.ch', author_email="chris.kerr@mykolab.ch",
license='GPLv3+', license="GPLv3+",
classifiers=[ classifiers=[
'Development Status :: 4 - Beta', "Development Status :: 4 - Beta",
'Intended Audience :: Developers', "Intended Audience :: Developers",
'Intended Audience :: Science/Research', "Intended Audience :: Science/Research",
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
'Natural Language :: English', "Natural Language :: English",
'Programming Language :: Python :: 3 :: Only', "Programming Language :: Python :: 3 :: Only",
'Topic :: Scientific/Engineering :: Chemistry', "Topic :: Scientific/Engineering :: Chemistry",
], ],
packages=['galvani'], packages=["galvani"],
entry_points={ entry_points={
'console_scripts': [ "console_scripts": [
'res2sqlite = galvani.res2sqlite:main', "res2sqlite = galvani.res2sqlite:main",
], ],
}, },
python_requires='>=3.6', python_requires=">=3.6",
install_requires=['numpy'], install_requires=["numpy"],
tests_require=['pytest'], tests_require=["pytest"],
) )

View File

@@ -9,7 +9,7 @@ import os
import pytest import pytest
@pytest.fixture(scope='session') @pytest.fixture(scope="session")
def testdata_dir(): def testdata_dir():
"""Path to the testdata directory.""" """Path to the testdata directory."""
return os.path.join(os.path.dirname(__file__), 'testdata') return os.path.join(os.path.dirname(__file__), "testdata")

View File

@@ -13,8 +13,7 @@ import pytest
from galvani import res2sqlite from galvani import res2sqlite
have_mdbtools = (subprocess.call(['which', 'mdb-export'], have_mdbtools = subprocess.call(["which", "mdb-export"], stdout=subprocess.DEVNULL) == 0
stdout=subprocess.DEVNULL) == 0)
def test_res2sqlite_help(): def test_res2sqlite_help():
@@ -22,39 +21,47 @@ def test_res2sqlite_help():
This should work even when mdbtools is not installed. This should work even when mdbtools is not installed.
""" """
help_output = subprocess.check_output(['res2sqlite', '--help']) help_output = subprocess.check_output(["res2sqlite", "--help"])
assert b'Convert Arbin .res files to sqlite3 databases' in help_output assert b"Convert Arbin .res files to sqlite3 databases" in help_output
@pytest.mark.skipif(have_mdbtools, reason='This tests the failure when mdbtools is not installed') @pytest.mark.skipif(
have_mdbtools, reason="This tests the failure when mdbtools is not installed"
)
def test_convert_Arbin_no_mdbtools(testdata_dir, tmpdir): def test_convert_Arbin_no_mdbtools(testdata_dir, tmpdir):
"""Checks that the conversion fails with an appropriate error message.""" """Checks that the conversion fails with an appropriate error message."""
res_file = os.path.join(testdata_dir, 'arbin1.res') res_file = os.path.join(testdata_dir, "arbin1.res")
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db') sqlite_file = os.path.join(str(tmpdir), "arbin1.s3db")
with pytest.raises(RuntimeError, match="Could not locate the `mdb-export` executable."): with pytest.raises(
RuntimeError, match="Could not locate the `mdb-export` executable."
):
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file) res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools') @pytest.mark.skipif(
@pytest.mark.parametrize('basename', ['arbin1', 'UM34_Test005E']) not have_mdbtools, reason="Reading the Arbin file requires MDBTools"
)
@pytest.mark.parametrize("basename", ["arbin1", "UM34_Test005E"])
def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename): def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename):
"""Convert an Arbin file to SQLite using the functional interface.""" """Convert an Arbin file to SQLite using the functional interface."""
res_file = os.path.join(testdata_dir, basename + '.res') res_file = os.path.join(testdata_dir, basename + ".res")
sqlite_file = os.path.join(str(tmpdir), basename + '.s3db') sqlite_file = os.path.join(str(tmpdir), basename + ".s3db")
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file) res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
assert os.path.isfile(sqlite_file) assert os.path.isfile(sqlite_file)
with sqlite3.connect(sqlite_file) as conn: with sqlite3.connect(sqlite_file) as conn:
csr = conn.execute('SELECT * FROM Channel_Normal_Table;') csr = conn.execute("SELECT * FROM Channel_Normal_Table;")
csr.fetchone() csr.fetchone()
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools') @pytest.mark.skipif(
not have_mdbtools, reason="Reading the Arbin file requires MDBTools"
)
def test_convert_cmdline(testdata_dir, tmpdir): def test_convert_cmdline(testdata_dir, tmpdir):
"""Checks that the conversion fails with an appropriate error message.""" """Checks that the conversion fails with an appropriate error message."""
res_file = os.path.join(testdata_dir, 'arbin1.res') res_file = os.path.join(testdata_dir, "arbin1.res")
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db') sqlite_file = os.path.join(str(tmpdir), "arbin1.s3db")
subprocess.check_call(['res2sqlite', res_file, sqlite_file]) subprocess.check_call(["res2sqlite", res_file, sqlite_file])
assert os.path.isfile(sqlite_file) assert os.path.isfile(sqlite_file)
with sqlite3.connect(sqlite_file) as conn: with sqlite3.connect(sqlite_file) as conn:
csr = conn.execute('SELECT * FROM Channel_Normal_Table;') csr = conn.execute("SELECT * FROM Channel_Normal_Table;")
csr.fetchone() csr.fetchone()

View File

@@ -17,33 +17,55 @@ from galvani.BioLogic import MPTfileCSV # not exported
def test_open_MPT(testdata_dir): def test_open_MPT(testdata_dir):
mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt')) mpt1, comments = MPTfile(os.path.join(testdata_dir, "bio_logic1.mpt"))
assert comments == [] assert comments == []
assert mpt1.dtype.names == ( assert mpt1.dtype.names == (
"mode", "ox/red", "error", "control changes", "Ns changes", "mode",
"counter inc.", "time/s", "control/V/mA", "Ewe/V", "dQ/mA.h", "P/W", "ox/red",
"I/mA", "(Q-Qo)/mA.h", "x", "error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V/mA",
"Ewe/V",
"dQ/mA.h",
"P/W",
"I/mA",
"(Q-Qo)/mA.h",
"x",
) )
def test_open_MPT_fails_for_bad_file(testdata_dir): def test_open_MPT_fails_for_bad_file(testdata_dir):
with pytest.raises(ValueError, match='Bad first line'): with pytest.raises(ValueError, match="Bad first line"):
MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr')) MPTfile(os.path.join(testdata_dir, "bio_logic1.mpr"))
def test_open_MPT_csv(testdata_dir): def test_open_MPT_csv(testdata_dir):
mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt')) mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, "bio_logic1.mpt"))
assert comments == [] assert comments == []
assert mpt1.fieldnames == [ assert mpt1.fieldnames == [
"mode", "ox/red", "error", "control changes", "Ns changes", "mode",
"counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", "P/W", "ox/red",
"<I>/mA", "(Q-Qo)/mA.h", "x", "error",
"control changes",
"Ns changes",
"counter inc.",
"time/s",
"control/V/mA",
"Ewe/V",
"dq/mA.h",
"P/W",
"<I>/mA",
"(Q-Qo)/mA.h",
"x",
] ]
def test_open_MPT_csv_fails_for_bad_file(testdata_dir): def test_open_MPT_csv_fails_for_bad_file(testdata_dir):
with pytest.raises((ValueError, UnicodeDecodeError)): with pytest.raises((ValueError, UnicodeDecodeError)):
MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr')) MPTfileCSV(os.path.join(testdata_dir, "bio_logic1.mpr"))
def test_colID_map_uniqueness(): def test_colID_map_uniqueness():
@@ -59,13 +81,16 @@ def test_colID_map_uniqueness():
assert not set(field_names).intersection(flag_names) assert not set(field_names).intersection(flag_names)
@pytest.mark.parametrize('colIDs, expected', [ @pytest.mark.parametrize(
([1, 2, 3], [('flags', 'u1')]), "colIDs, expected",
([4, 6], [('time/s', '<f8'), ('Ewe/V', '<f4')]), [
([1, 4, 21], [('flags', 'u1'), ('time/s', '<f8')]), ([1, 2, 3], [("flags", "u1")]),
([4, 6, 4], [('time/s', '<f8'), ('Ewe/V', '<f4'), ('time/s 2', '<f8')]), ([4, 6], [("time/s", "<f8"), ("Ewe/V", "<f4")]),
([1, 4, 21], [("flags", "u1"), ("time/s", "<f8")]),
([4, 6, 4], [("time/s", "<f8"), ("Ewe/V", "<f4"), ("time/s 2", "<f8")]),
([4, 9999], NotImplementedError), ([4, 9999], NotImplementedError),
]) ],
)
def test_colID_to_dtype(colIDs, expected): def test_colID_to_dtype(colIDs, expected):
"""Test converting column ID to numpy dtype.""" """Test converting column ID to numpy dtype."""
if isinstance(expected, type) and issubclass(expected, Exception): if isinstance(expected, type) and issubclass(expected, Exception):
@@ -77,14 +102,17 @@ def test_colID_to_dtype(colIDs, expected):
assert dtype == expected_dtype assert dtype == expected_dtype
@pytest.mark.parametrize('data, expected', [ @pytest.mark.parametrize(
('02/23/17', date(2017, 2, 23)), "data, expected",
('10-03-05', date(2005, 10, 3)), [
('11.12.20', date(2020, 11, 12)), ("02/23/17", date(2017, 2, 23)),
(b'01/02/03', date(2003, 1, 2)), ("10-03-05", date(2005, 10, 3)),
('13.08.07', ValueError), ("11.12.20", date(2020, 11, 12)),
('03-04/05', ValueError), (b"01/02/03", date(2003, 1, 2)),
]) ("13.08.07", ValueError),
("03-04/05", ValueError),
],
)
def test_parse_BioLogic_date(data, expected): def test_parse_BioLogic_date(data, expected):
"""Test the parse_BioLogic_date function.""" """Test the parse_BioLogic_date function."""
if isinstance(expected, type) and issubclass(expected, Exception): if isinstance(expected, type) and issubclass(expected, Exception):
@@ -95,51 +123,54 @@ def test_parse_BioLogic_date(data, expected):
assert result == expected assert result == expected
@pytest.mark.parametrize('filename, startdate, enddate', [ @pytest.mark.parametrize(
('bio_logic1.mpr', '2011-10-29', '2011-10-31'), "filename, startdate, enddate",
('bio_logic2.mpr', '2012-09-27', '2012-09-27'), [
('bio_logic3.mpr', '2013-03-27', '2013-03-27'), ("bio_logic1.mpr", "2011-10-29", "2011-10-31"),
('bio_logic4.mpr', '2011-11-01', '2011-11-02'), ("bio_logic2.mpr", "2012-09-27", "2012-09-27"),
('bio_logic5.mpr', '2013-01-28', '2013-01-28'), ("bio_logic3.mpr", "2013-03-27", "2013-03-27"),
("bio_logic4.mpr", "2011-11-01", "2011-11-02"),
("bio_logic5.mpr", "2013-01-28", "2013-01-28"),
# bio_logic6.mpr has no end date because it does not have a VMP LOG module # bio_logic6.mpr has no end date because it does not have a VMP LOG module
('bio_logic6.mpr', '2012-09-11', None), ("bio_logic6.mpr", "2012-09-11", None),
# C019P-0ppb-A_C01.mpr stores the date in a different format # C019P-0ppb-A_C01.mpr stores the date in a different format
('C019P-0ppb-A_C01.mpr', '2019-03-14', '2019-03-14'), ("C019P-0ppb-A_C01.mpr", "2019-03-14", "2019-03-14"),
('Rapp_Error.mpr', '2010-12-02', '2010-12-02'), ("Rapp_Error.mpr", "2010-12-02", "2010-12-02"),
('Ewe_Error.mpr', '2021-11-18', '2021-11-19'), ("Ewe_Error.mpr", "2021-11-18", "2021-11-19"),
]) ],
)
def test_MPR_dates(testdata_dir, filename, startdate, enddate): def test_MPR_dates(testdata_dir, filename, startdate, enddate):
"""Check that the start and end dates in .mpr files are read correctly.""" """Check that the start and end dates in .mpr files are read correctly."""
mpr = MPRfile(os.path.join(testdata_dir, filename)) mpr = MPRfile(os.path.join(testdata_dir, filename))
assert mpr.startdate.strftime('%Y-%m-%d') == startdate assert mpr.startdate.strftime("%Y-%m-%d") == startdate
if enddate: if enddate:
assert mpr.enddate.strftime('%Y-%m-%d') == enddate assert mpr.enddate.strftime("%Y-%m-%d") == enddate
else: else:
assert not hasattr(mpr, 'enddate') assert not hasattr(mpr, "enddate")
def test_open_MPR_fails_for_bad_file(testdata_dir): def test_open_MPR_fails_for_bad_file(testdata_dir):
with pytest.raises(ValueError, match='Invalid magic for .mpr file'): with pytest.raises(ValueError, match="Invalid magic for .mpr file"):
MPRfile(os.path.join(testdata_dir, 'arbin1.res')) MPRfile(os.path.join(testdata_dir, "arbin1.res"))
def timestamp_from_comments(comments): def timestamp_from_comments(comments):
for line in comments: for line in comments:
time_match = re.match(b'Acquisition started on : ([0-9/]+ [0-9:]+)', line) time_match = re.match(b"Acquisition started on : ([0-9/]+ [0-9:]+)", line)
if time_match: if time_match:
timestamp = datetime.strptime(time_match.group(1).decode('ascii'), timestamp = datetime.strptime(
'%m/%d/%Y %H:%M:%S') time_match.group(1).decode("ascii"), "%m/%d/%Y %H:%M:%S"
)
return timestamp return timestamp
raise AttributeError("No timestamp in comments") raise AttributeError("No timestamp in comments")
def assert_MPR_matches_MPT(mpr, mpt, comments): def assert_MPR_matches_MPT(mpr, mpt, comments):
def assert_field_matches(fieldname, decimal): def assert_field_matches(fieldname, decimal):
if fieldname in mpr.dtype.fields: if fieldname in mpr.dtype.fields:
assert_array_almost_equal(mpr.data[fieldname], assert_array_almost_equal(
mpt[fieldname], mpr.data[fieldname], mpt[fieldname], decimal=decimal
decimal=decimal) )
def assert_field_exact(fieldname): def assert_field_exact(fieldname):
if fieldname in mpr.dtype.fields: if fieldname in mpr.dtype.fields:
@@ -154,16 +185,16 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
# Nothing uses the 0x40 bit of the flags # Nothing uses the 0x40 bit of the flags
assert_array_equal(mpr.get_flag("counter inc."), mpt["counter inc."]) assert_array_equal(mpr.get_flag("counter inc."), mpt["counter inc."])
assert_array_almost_equal(mpr.data["time/s"], assert_array_almost_equal(
mpt["time/s"], mpr.data["time/s"], mpt["time/s"], decimal=2
decimal=2) # 2 digits in CSV ) # 2 digits in CSV
assert_field_matches("control/V/mA", decimal=6) assert_field_matches("control/V/mA", decimal=6)
assert_field_matches("control/V", decimal=6) assert_field_matches("control/V", decimal=6)
assert_array_almost_equal(mpr.data["Ewe/V"], assert_array_almost_equal(
mpt["Ewe/V"], mpr.data["Ewe/V"], mpt["Ewe/V"], decimal=6
decimal=6) # 32 bit float precision ) # 32 bit float precision
assert_field_matches("dQ/mA.h", decimal=16) # 64 bit float precision assert_field_matches("dQ/mA.h", decimal=16) # 64 bit float precision
assert_field_matches("P/W", decimal=10) # 32 bit float precision for 1.xxE-5 assert_field_matches("P/W", decimal=10) # 32 bit float precision for 1.xxE-5
@@ -178,39 +209,45 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
pass pass
@pytest.mark.parametrize('basename', [ @pytest.mark.parametrize(
'bio_logic1', "basename",
'bio_logic2', [
"bio_logic1",
"bio_logic2",
# No bio_logic3.mpt file # No bio_logic3.mpt file
'bio_logic4', "bio_logic4",
# bio_logic5 and bio_logic6 are special cases # bio_logic5 and bio_logic6 are special cases
'CV_C01', "CV_C01",
'121_CA_455nm_6V_30min_C01', "121_CA_455nm_6V_30min_C01",
'020-formation_CB5', "020-formation_CB5",
]) ],
)
def test_MPR_matches_MPT(testdata_dir, basename): def test_MPR_matches_MPT(testdata_dir, basename):
"""Check the MPR parser against the MPT parser. """Check the MPR parser against the MPT parser.
Load a binary .mpr file and a text .mpt file which should contain Load a binary .mpr file and a text .mpt file which should contain
exactly the same data. Check that the loaded data actually match. exactly the same data. Check that the loaded data actually match.
""" """
binpath = os.path.join(testdata_dir, basename + '.mpr') binpath = os.path.join(testdata_dir, basename + ".mpr")
txtpath = os.path.join(testdata_dir, basename + '.mpt') txtpath = os.path.join(testdata_dir, basename + ".mpt")
mpr = MPRfile(binpath) mpr = MPRfile(binpath)
mpt, comments = MPTfile(txtpath, encoding='latin1') mpt, comments = MPTfile(txtpath, encoding="latin1")
assert_MPR_matches_MPT(mpr, mpt, comments) assert_MPR_matches_MPT(mpr, mpt, comments)
def test_MPR5_matches_MPT5(testdata_dir): def test_MPR5_matches_MPT5(testdata_dir):
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr')) mpr = MPRfile(os.path.join(testdata_dir, "bio_logic5.mpr"))
mpt, comments = MPTfile((re.sub(b'\tXXX\t', b'\t0\t', line) for line in mpt, comments = MPTfile(
open(os.path.join(testdata_dir, 'bio_logic5.mpt'), (
mode='rb'))) re.sub(b"\tXXX\t", b"\t0\t", line)
for line in open(os.path.join(testdata_dir, "bio_logic5.mpt"), mode="rb")
)
)
assert_MPR_matches_MPT(mpr, mpt, comments) assert_MPR_matches_MPT(mpr, mpt, comments)
def test_MPR6_matches_MPT6(testdata_dir): def test_MPR6_matches_MPT6(testdata_dir):
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr')) mpr = MPRfile(os.path.join(testdata_dir, "bio_logic6.mpr"))
mpt, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic6.mpt')) mpt, comments = MPTfile(os.path.join(testdata_dir, "bio_logic6.mpt"))
mpr.data = mpr.data[:958] # .mpt file is incomplete mpr.data = mpr.data[:958] # .mpt file is incomplete
assert_MPR_matches_MPT(mpr, mpt, comments) assert_MPR_matches_MPT(mpr, mpt, comments)