diff --git a/galvani/BioLogic.py b/galvani/BioLogic.py index acddd3a..6dc9287 100644 --- a/galvani/BioLogic.py +++ b/galvani/BioLogic.py @@ -5,7 +5,7 @@ # # SPDX-License-Identifier: GPL-3.0-or-later -__all__ = ['MPTfileCSV', 'MPTfile'] +__all__ = ["MPTfileCSV", "MPTfile"] import re import csv @@ -21,19 +21,44 @@ def fieldname_to_dtype(fieldname): """Converts a column header from the MPT file into a tuple of canonical name and appropriate numpy dtype""" - if fieldname == 'mode': - return ('mode', np.uint8) - elif fieldname in ("ox/red", "error", "control changes", "Ns changes", - "counter inc."): + if fieldname == "mode": + return ("mode", np.uint8) + elif fieldname in ( + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + ): return (fieldname, np.bool_) - elif fieldname in ("time/s", "P/W", "(Q-Qo)/mA.h", "x", "control/V", - "control/mA", "control/V/mA", "(Q-Qo)/C", "dQ/C", - "freq/Hz", "|Ewe|/V", "|I|/A", "Phase(Z)/deg", - "|Z|/Ohm", "Re(Z)/Ohm", "-Im(Z)/Ohm"): + elif fieldname in ( + "time/s", + "P/W", + "(Q-Qo)/mA.h", + "x", + "control/V", + "control/mA", + "control/V/mA", + "(Q-Qo)/C", + "dQ/C", + "freq/Hz", + "|Ewe|/V", + "|I|/A", + "Phase(Z)/deg", + "|Z|/Ohm", + "Re(Z)/Ohm", + "-Im(Z)/Ohm", + ): return (fieldname, np.float_) - elif fieldname in ("Q charge/discharge/mA.h", "step time/s", - "Q charge/mA.h", "Q discharge/mA.h", - "Temperature/°C", "Efficiency/%", "Capacity/mA.h"): + elif fieldname in ( + "Q charge/discharge/mA.h", + "step time/s", + "Q charge/mA.h", + "Q discharge/mA.h", + "Temperature/°C", + "Efficiency/%", + "Capacity/mA.h", + ): return (fieldname, np.float_) elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"): return (fieldname, np.int_) @@ -43,12 +68,28 @@ def fieldname_to_dtype(fieldname): return ("I/mA", np.float_) elif fieldname in ("Ewe/V", "/V", "Ecell/V"): return ("Ewe/V", np.float_) - elif fieldname.endswith(("/s", "/Hz", "/deg", - "/W", "/mW", "/W.h", "/mW.h", - "/A", "/mA", "/A.h", "/mA.h", - "/V", "/mV", - "/F", "/mF", "/uF", - "/C", "/Ohm",)): + elif fieldname.endswith( + ( + "/s", + "/Hz", + "/deg", + "/W", + "/mW", + "/W.h", + "/mW.h", + "/A", + "/mA", + "/A.h", + "/mA.h", + "/V", + "/mV", + "/F", + "/mF", + "/uF", + "/C", + "/Ohm", + ) + ): return (fieldname, np.float_) else: raise ValueError("Invalid column header: %s" % fieldname) @@ -56,11 +97,11 @@ def fieldname_to_dtype(fieldname): def comma_converter(float_text): """Convert text to float whether the decimal point is '.' or ','""" - trans_table = bytes.maketrans(b',', b'.') + trans_table = bytes.maketrans(b",", b".") return float(float_text.translate(trans_table)) -def MPTfile(file_or_path, encoding='ascii'): +def MPTfile(file_or_path, encoding="ascii"): """Opens .mpt files as numpy record arrays Checks for the correct headings, skips any comments and returns a @@ -68,16 +109,15 @@ def MPTfile(file_or_path, encoding='ascii'): """ if isinstance(file_or_path, str): - mpt_file = open(file_or_path, 'rb') + mpt_file = open(file_or_path, "rb") else: mpt_file = file_or_path magic = next(mpt_file) - if magic not in (b'EC-Lab ASCII FILE\r\n', b'BT-Lab ASCII FILE\r\n'): + if magic not in (b"EC-Lab ASCII FILE\r\n", b"BT-Lab ASCII FILE\r\n"): raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) - nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$', - next(mpt_file)) + nb_headers_match = re.match(rb"Nb header lines : (\d+)\s*$", next(mpt_file)) nb_headers = int(nb_headers_match.group(1)) if nb_headers < 3: raise ValueError("Too few header lines: %d" % nb_headers) @@ -86,14 +126,12 @@ def MPTfile(file_or_path, encoding='ascii'): # make three lines. Every additional line is a comment line. comments = [next(mpt_file) for i in range(nb_headers - 3)] - fieldnames = next(mpt_file).decode(encoding).strip().split('\t') + fieldnames = next(mpt_file).decode(encoding).strip().split("\t") record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames))) # Must be able to parse files where commas are used for decimal points - converter_dict = dict(((i, comma_converter) - for i in range(len(fieldnames)))) - mpt_array = np.loadtxt(mpt_file, dtype=record_type, - converters=converter_dict) + converter_dict = dict(((i, comma_converter) for i in range(len(fieldnames)))) + mpt_array = np.loadtxt(mpt_file, dtype=record_type, converters=converter_dict) return mpt_array, comments @@ -106,15 +144,15 @@ def MPTfileCSV(file_or_path): """ if isinstance(file_or_path, str): - mpt_file = open(file_or_path, 'r') + mpt_file = open(file_or_path, "r") else: mpt_file = file_or_path magic = next(mpt_file) - if magic.rstrip() != 'EC-Lab ASCII FILE': + if magic.rstrip() != "EC-Lab ASCII FILE": raise ValueError("Bad first line for EC-Lab file: '%s'" % magic) - nb_headers_match = re.match(r'Nb header lines : (\d+)\s*$', next(mpt_file)) + nb_headers_match = re.match(r"Nb header lines : (\d+)\s*$", next(mpt_file)) nb_headers = int(nb_headers_match.group(1)) if nb_headers < 3: raise ValueError("Too few header lines: %d" % nb_headers) @@ -123,154 +161,206 @@ def MPTfileCSV(file_or_path): # make three lines. Every additional line is a comment line. comments = [next(mpt_file) for i in range(nb_headers - 3)] - mpt_csv = csv.DictReader(mpt_file, dialect='excel-tab') + mpt_csv = csv.DictReader(mpt_file, dialect="excel-tab") expected_fieldnames = ( - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V/mA", "Ewe/V", "dq/mA.h", - "P/W", "/mA", "(Q-Qo)/mA.h", "x"], - ['mode', 'ox/red', 'error', 'control changes', 'Ns changes', - 'counter inc.', 'time/s', 'control/V', 'Ewe/V', 'dq/mA.h', - '/mA', '(Q-Qo)/mA.h', 'x'], - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V", "Ewe/V", "I/mA", - "dQ/mA.h", "P/W"], - ["mode", "ox/red", "error", "control changes", "Ns changes", - "counter inc.", "time/s", "control/V", "Ewe/V", "/mA", - "dQ/mA.h", "P/W"]) + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V/mA", + "Ewe/V", + "dq/mA.h", + "P/W", + "/mA", + "(Q-Qo)/mA.h", + "x", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "dq/mA.h", + "/mA", + "(Q-Qo)/mA.h", + "x", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "I/mA", + "dQ/mA.h", + "P/W", + ], + [ + "mode", + "ox/red", + "error", + "control changes", + "Ns changes", + "counter inc.", + "time/s", + "control/V", + "Ewe/V", + "/mA", + "dQ/mA.h", + "P/W", + ], + ) if mpt_csv.fieldnames not in expected_fieldnames: raise ValueError("Unrecognised headers for MPT file format") return mpt_csv, comments -VMPmodule_hdr = np.dtype([('shortname', 'S10'), - ('longname', 'S25'), - ('length', ' ?? - 9: ('Ece/V', '/mA', '/V', '/V', '/V', '/V', '/V', '/V', ' ?? + 9: ("Ece/V", "/mA", "/V", "/V", "/V", "/V", "/V", "/V", " 1: - unique_field_name = '%s %d' % (field_name, count) + unique_field_name = "%s %d" % (field_name, count) else: unique_field_name = field_name type_list.append((unique_field_name, field_type)) else: - raise NotImplementedError("Column ID {cid} after column {prev} " - "is unknown" - .format(cid=colID, - prev=type_list[-1][0])) + raise NotImplementedError( + "Column ID {cid} after column {prev} " + "is unknown".format(cid=colID, prev=type_list[-1][0]) + ) return np.dtype(type_list), flags_dict @@ -341,12 +433,13 @@ def read_VMP_modules(fileobj, read_module_data=True): N.B. the offset yielded is the offset to the start of the data i.e. after the end of the header. The data runs from (offset) to (offset+length)""" while True: - module_magic = fileobj.read(len(b'MODULE')) + module_magic = fileobj.read(len(b"MODULE")) if len(module_magic) == 0: # end of file break - elif module_magic != b'MODULE': - raise ValueError("Found %r, expecting start of new VMP MODULE" - % module_magic) + elif module_magic != b"MODULE": + raise ValueError( + "Found %r, expecting start of new VMP MODULE" % module_magic + ) hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) if len(hdr_bytes) < VMPmodule_hdr.itemsize: @@ -354,23 +447,24 @@ def read_VMP_modules(fileobj, read_module_data=True): hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1) hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names)) - hdr_dict['offset'] = fileobj.tell() + hdr_dict["offset"] = fileobj.tell() if read_module_data: - hdr_dict['data'] = fileobj.read(hdr_dict['length']) - if len(hdr_dict['data']) != hdr_dict['length']: - raise IOError("""Unexpected end of file while reading data + hdr_dict["data"] = fileobj.read(hdr_dict["length"]) + if len(hdr_dict["data"]) != hdr_dict["length"]: + raise IOError( + """Unexpected end of file while reading data current module: %s length read: %d - length expected: %d""" % (hdr_dict['longname'], - len(hdr_dict['data']), - hdr_dict['length'])) + length expected: %d""" + % (hdr_dict["longname"], len(hdr_dict["data"]), hdr_dict["length"]) + ) yield hdr_dict else: yield hdr_dict - fileobj.seek(hdr_dict['offset'] + hdr_dict['length'], SEEK_SET) + fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET) -MPR_MAGIC = b'BIO-LOGIC MODULAR FILE\x1a'.ljust(48) + b'\x00\x00\x00\x00' +MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00" class MPRfile: @@ -392,41 +486,44 @@ class MPRfile: def __init__(self, file_or_path): self.loop_index = None if isinstance(file_or_path, str): - mpr_file = open(file_or_path, 'rb') + mpr_file = open(file_or_path, "rb") else: mpr_file = file_or_path magic = mpr_file.read(len(MPR_MAGIC)) if magic != MPR_MAGIC: - raise ValueError('Invalid magic for .mpr file: %s' % magic) + raise ValueError("Invalid magic for .mpr file: %s" % magic) modules = list(read_VMP_modules(mpr_file)) self.modules = modules - settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ') - data_module, = (m for m in modules if m['shortname'] == b'VMP data ') - maybe_loop_module = [m for m in modules if m['shortname'] == b'VMP loop '] - maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG '] + (settings_mod,) = (m for m in modules if m["shortname"] == b"VMP Set ") + (data_module,) = (m for m in modules if m["shortname"] == b"VMP data ") + maybe_loop_module = [m for m in modules if m["shortname"] == b"VMP loop "] + maybe_log_module = [m for m in modules if m["shortname"] == b"VMP LOG "] - n_data_points = np.frombuffer(data_module['data'][:4], dtype=' 40000 and ole_timestamp1 < 50000: ole_timestamp = ole_timestamp1 @@ -483,14 +584,16 @@ class MPRfile: ole_timedelta = timedelta(days=ole_timestamp[0]) self.timestamp = ole_base + ole_timedelta if self.startdate != self.timestamp.date(): - raise ValueError("Date mismatch:\n" - + " Start date: %s\n" % self.startdate - + " End date: %s\n" % self.enddate - + " Timestamp: %s\n" % self.timestamp) + raise ValueError( + "Date mismatch:\n" + + " Start date: %s\n" % self.startdate + + " End date: %s\n" % self.enddate + + " Timestamp: %s\n" % self.timestamp + ) def get_flag(self, flagname): if flagname in self.flags_dict: mask, dtype = self.flags_dict[flagname] - return np.array(self.data['flags'] & mask, dtype=dtype) + return np.array(self.data["flags"] & mask, dtype=dtype) else: raise AttributeError("Flag '%s' not present" % flagname)