mirror of
https://github.com/echemdata/galvani.git
synced 2026-05-13 14:57:04 +00:00
Compare commits
50 Commits
0.3.0
...
d4a5f444b1
| Author | SHA1 | Date | |
|---|---|---|---|
|
d4a5f444b1
|
|||
|
d81bf829bb
|
|||
|
d77aa1555b
|
|||
|
0d684af470
|
|||
|
baec8934b8
|
|||
| ccaa66b206 | |||
| a59f263c2b | |||
| 30d6098aa0 | |||
| 2c90a2b038 | |||
| 5a207dbf5e | |||
| 7964dc85db | |||
| 569a5f2a9c | |||
|
b6143e4b05
|
|||
|
4efec58374
|
|||
|
627387f9c4
|
|||
|
12b4badc31
|
|||
|
5ed03ed20c
|
|||
| c8e5bb12b8 | |||
| 1d913dd2f1 | |||
| 3c1446ff07 | |||
| e18a21ffbc | |||
| 260ad72a6e | |||
| 7d264999db | |||
| 1e53de56ef | |||
| f44851ec37 | |||
| 3b5dc48fc6 | |||
| 56bebfe498 | |||
| d33c6f7561 | |||
| 79e3df0ed9 | |||
| 3c904db04e | |||
| fbc90fc961 | |||
|
545a82ec35
|
|||
| 7c37ea306b | |||
| cd3eaae2c1 | |||
| a9be96b5c2 | |||
| 0c2ecd42ca | |||
| a845731131 | |||
| 6d2a5b31fb | |||
| 1fd9f8454a | |||
| f0177f2470 | |||
| ea50999349 | |||
| 88d1fc3a71 | |||
|
4971f2b550
|
|||
|
5cdc620f16
|
|||
|
7a6ac1c542
|
|||
|
46f296f61f
|
|||
| aa0aee6128 | |||
|
dbd01957db
|
|||
|
13957160f8
|
|||
|
77d56290d4
|
@@ -17,8 +17,7 @@ jobs:
|
||||
|
||||
pytest:
|
||||
name: Run Python unit tests
|
||||
# Note that 20.04 is currently required until galvani supports mdbtools>=1.0.
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
@@ -27,13 +26,31 @@ jobs:
|
||||
python-version: ['3.8', '3.9', '3.10', '3.11']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
lfs: false
|
||||
|
||||
# Due to limited LFS bandwidth, it is preferable to download
|
||||
# test files from the last release.
|
||||
#
|
||||
# This does mean that testing new LFS files in the CI is tricky;
|
||||
# care should be taken to also test new files locally first
|
||||
# Tests missing these files in the CI should still fail.
|
||||
- name: Download static files from last release for testing
|
||||
uses: robinraju/release-downloader@v1.12
|
||||
with:
|
||||
latest: true
|
||||
tarBall: true
|
||||
out-file-path: /home/runner/work/last-release
|
||||
extract: true
|
||||
|
||||
- name: Copy test files from static downloaded release
|
||||
run: |
|
||||
cp -r /home/runner/work/last-release/*/tests/testdata tests
|
||||
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
|
||||
@@ -50,5 +67,5 @@ jobs:
|
||||
tox -vv --notest
|
||||
|
||||
- name: Run all tests
|
||||
run: |
|
||||
run: |-
|
||||
tox --skip-pkg-install
|
||||
|
||||
+1
-1
@@ -1,7 +1,7 @@
|
||||
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
|
||||
Upstream-Name: Galvani
|
||||
Upstream-Contact: Christopher Kerr <chris.kerr@mykolab.ch>
|
||||
Source: https://github.com/echemdata/galvani
|
||||
Source: https://codeberg.org/echemdata/galvani
|
||||
|
||||
Files: tests/testdata/*
|
||||
Copyright: 2010-2014 Christopher Kerr <chris.kerr@mykolab.ch>
|
||||
|
||||
@@ -7,21 +7,60 @@ SPDX-FileCopyrightText: 2013-2020 Christopher Kerr, Peter Attia
|
||||
SPDX-License-Identifier: GPL-3.0-or-later
|
||||
-->
|
||||
|
||||
Read proprietary file formats from electrochemical test stations
|
||||
Read proprietary file formats from electrochemical test stations.
|
||||
|
||||
## Bio-Logic .mpr files ##
|
||||
# Usage
|
||||
|
||||
## Bio-Logic .mpr files
|
||||
|
||||
Use the `MPRfile` class from BioLogic.py (exported in the main package)
|
||||
|
||||
````
|
||||
```python
|
||||
from galvani import BioLogic
|
||||
import pandas as pd
|
||||
|
||||
mpr_file = BioLogic.MPRfile('test.mpr')
|
||||
df = pd.DataFrame(mpr_file.data)
|
||||
````
|
||||
```
|
||||
|
||||
## Arbin .res files ##
|
||||
## Arbin .res files
|
||||
|
||||
Use the res2sqlite.py script to convert the .res file to a sqlite3 database
|
||||
with the same schema.
|
||||
Use the `./galvani/res2sqlite.py` script to convert the .res file to a sqlite3 database with the same schema, which can then be interrogated with external tools or directly in Python.
|
||||
For example, to extract the data into a pandas DataFrame (will need to be installed separately):
|
||||
|
||||
```python
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
from galvani.res2sqlite import convert_arbin_to_sqlite
|
||||
convert_arbin_to_sqlite("input.res", "output.sqlite")
|
||||
with sqlite3.connect("output.sqlite") as db:
|
||||
df = pd.read_sql(sql="select * from Channel_Normal_Table", con=db)
|
||||
```
|
||||
|
||||
This functionality requires [MDBTools](https://github.com/mdbtools/mdbtools) to be installed on the local system.
|
||||
|
||||
# Installation
|
||||
|
||||
The latest galvani releases can be installed from [PyPI](https://pypi.org/project/galvani/) via
|
||||
|
||||
```shell
|
||||
pip install galvani
|
||||
```
|
||||
|
||||
The latest development version can be installed with `pip` directly from GitHub:
|
||||
|
||||
```shell
|
||||
pip install git+https://codeberg.org/echemdata/galvani
|
||||
```
|
||||
|
||||
## Development installation and contributing
|
||||
|
||||
If you wish to contribute to galvani, please clone the repository and install the testing dependencies:
|
||||
|
||||
```shell
|
||||
git clone git@codeberg.org:echemdata/galvani
|
||||
cd galvani
|
||||
pip install -e .\[tests\]
|
||||
```
|
||||
|
||||
Code can be contributed back via [pull requests](https://codeberg.org/echemdata/galvani/pulls) and new features or bugs can be discussed in the [issue tracker](https://codeberg.org/echemdata/galvani/issues).
|
||||
|
||||
+260
-33
@@ -10,12 +10,16 @@ __all__ = ["MPTfileCSV", "MPTfile"]
|
||||
import re
|
||||
import csv
|
||||
from os import SEEK_SET
|
||||
import os.path
|
||||
import time
|
||||
from datetime import date, datetime, timedelta
|
||||
from collections import defaultdict, OrderedDict
|
||||
import warnings
|
||||
|
||||
import numpy as np
|
||||
|
||||
UNKNOWN_COLUMN_TYPE_HIERARCHY = ("<f8", "<f4", "<u4", "<u2", "<u1")
|
||||
|
||||
|
||||
def fieldname_to_dtype(fieldname):
|
||||
"""Converts a column header from the MPT file into a tuple of
|
||||
@@ -48,8 +52,15 @@ def fieldname_to_dtype(fieldname):
|
||||
"|Z|/Ohm",
|
||||
"Re(Z)/Ohm",
|
||||
"-Im(Z)/Ohm",
|
||||
"Re(M)",
|
||||
"Im(M)",
|
||||
"|M|",
|
||||
"Re(Permittivity)",
|
||||
"Im(Permittivity)",
|
||||
"|Permittivity|",
|
||||
"Tan(Delta)",
|
||||
):
|
||||
return (fieldname, np.float_)
|
||||
return (fieldname, np.float64)
|
||||
elif fieldname in (
|
||||
"Q charge/discharge/mA.h",
|
||||
"step time/s",
|
||||
@@ -59,15 +70,15 @@ def fieldname_to_dtype(fieldname):
|
||||
"Efficiency/%",
|
||||
"Capacity/mA.h",
|
||||
):
|
||||
return (fieldname, np.float_)
|
||||
elif fieldname in ("cycle number", "I Range", "Ns", "half cycle"):
|
||||
return (fieldname, np.float64)
|
||||
elif fieldname in ("cycle number", "I Range", "Ns", "half cycle", "z cycle"):
|
||||
return (fieldname, np.int_)
|
||||
elif fieldname in ("dq/mA.h", "dQ/mA.h"):
|
||||
return ("dQ/mA.h", np.float_)
|
||||
return ("dQ/mA.h", np.float64)
|
||||
elif fieldname in ("I/mA", "<I>/mA"):
|
||||
return ("I/mA", np.float_)
|
||||
elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V"):
|
||||
return ("Ewe/V", np.float_)
|
||||
return ("I/mA", np.float64)
|
||||
elif fieldname in ("Ewe/V", "<Ewe>/V", "Ecell/V", "<Ewe/V>"):
|
||||
return ("Ewe/V", np.float64)
|
||||
elif fieldname.endswith(
|
||||
(
|
||||
"/s",
|
||||
@@ -86,11 +97,17 @@ def fieldname_to_dtype(fieldname):
|
||||
"/F",
|
||||
"/mF",
|
||||
"/uF",
|
||||
"/µF",
|
||||
"/nF",
|
||||
"/C",
|
||||
"/Ohm",
|
||||
"/Ohm-1",
|
||||
"/Ohm.cm",
|
||||
"/mS/cm",
|
||||
"/%",
|
||||
)
|
||||
):
|
||||
return (fieldname, np.float_)
|
||||
return (fieldname, np.float64)
|
||||
else:
|
||||
raise ValueError("Invalid column header: %s" % fieldname)
|
||||
|
||||
@@ -230,7 +247,7 @@ def MPTfileCSV(file_or_path):
|
||||
return mpt_csv, comments
|
||||
|
||||
|
||||
VMPmodule_hdr = np.dtype(
|
||||
VMPmodule_hdr_v1 = np.dtype(
|
||||
[
|
||||
("shortname", "S10"),
|
||||
("longname", "S25"),
|
||||
@@ -240,17 +257,30 @@ VMPmodule_hdr = np.dtype(
|
||||
]
|
||||
)
|
||||
|
||||
VMPmodule_hdr_v2 = np.dtype(
|
||||
[
|
||||
("shortname", "S10"),
|
||||
("longname", "S25"),
|
||||
("max length", "<u4"),
|
||||
("length", "<u4"),
|
||||
("version", "<u4"),
|
||||
("unknown2", "<u4"), # 10 for set, log and loop, 11 for data
|
||||
("date", "S8"),
|
||||
]
|
||||
)
|
||||
|
||||
# Maps from colID to a tuple defining a numpy dtype
|
||||
VMPdata_colID_dtype_map = {
|
||||
4: ("time/s", "<f8"),
|
||||
5: ("control/V/mA", "<f4"),
|
||||
6: ("Ewe/V", "<f4"),
|
||||
7: ("dQ/mA.h", "<f8"),
|
||||
7: ("dq/mA.h", "<f8"),
|
||||
8: ("I/mA", "<f4"), # 8 is either I or <I> ??
|
||||
9: ("Ece/V", "<f4"),
|
||||
11: ("I/mA", "<f8"),
|
||||
11: ("<I>/mA", "<f8"),
|
||||
13: ("(Q-Qo)/mA.h", "<f8"),
|
||||
16: ("Analog IN 1/V", "<f4"),
|
||||
17: ("Analog IN 2/V", "<f4"), # Probably column 18 is Analog IN 3/V, if anyone hits this error in the future # noqa: E501
|
||||
19: ("control/V", "<f4"),
|
||||
20: ("control/mA", "<f4"),
|
||||
23: ("dQ/mA.h", "<f8"), # Same as 7?
|
||||
@@ -267,7 +297,7 @@ VMPdata_colID_dtype_map = {
|
||||
39: ("I Range", "<u2"),
|
||||
69: ("R/Ohm", "<f4"),
|
||||
70: ("P/W", "<f4"),
|
||||
74: ("Energy/W.h", "<f8"),
|
||||
74: ("|Energy|/W.h", "<f8"),
|
||||
75: ("Analog OUT/V", "<f4"),
|
||||
76: ("<I>/mA", "<f4"),
|
||||
77: ("<Ewe>/V", "<f4"),
|
||||
@@ -287,8 +317,31 @@ VMPdata_colID_dtype_map = {
|
||||
169: ("Cs/µF", "<f4"),
|
||||
172: ("Cp/µF", "<f4"),
|
||||
173: ("Cp-2/µF-2", "<f4"),
|
||||
174: ("Ewe/V", "<f4"),
|
||||
241: ("|E1|/V", "<f4"),
|
||||
174: ("<Ewe>/V", "<f4"),
|
||||
178: ("(Q-Qo)/C", "<f4"),
|
||||
179: ("dQ/C", "<f4"),
|
||||
182: ("step time/s", "<f8"),
|
||||
211: ("Q charge/discharge/mA.h", "<f8"),
|
||||
212: ("half cycle", "<u4"),
|
||||
213: ("z cycle", "<u4"),
|
||||
217: ("THD Ewe/%", "<f4"),
|
||||
218: ("THD I/%", "<f4"),
|
||||
220: ("NSD Ewe/%", "<f4"),
|
||||
221: ("NSD I/%", "<f4"),
|
||||
223: ("NSR Ewe/%", "<f4"),
|
||||
224: ("NSR I/%", "<f4"),
|
||||
230: ("|Ewe h2|/V", "<f4"),
|
||||
231: ("|Ewe h3|/V", "<f4"),
|
||||
232: ("|Ewe h4|/V", "<f4"),
|
||||
233: ("|Ewe h5|/V", "<f4"),
|
||||
234: ("|Ewe h6|/V", "<f4"),
|
||||
235: ("|Ewe h7|/V", "<f4"),
|
||||
236: ("|I h2|/A", "<f4"),
|
||||
237: ("|I h3|/A", "<f4"),
|
||||
238: ("|I h4|/A", "<f4"),
|
||||
239: ("|I h5|/A", "<f4"),
|
||||
240: ("|I h6|/A", "<f4"),
|
||||
241: ("|I h7|/A", "<f4"),
|
||||
242: ("|E2|/V", "<f4"),
|
||||
271: ("Phase(Z1) / deg", "<f4"),
|
||||
272: ("Phase(Z2) / deg", "<f4"),
|
||||
@@ -379,18 +432,23 @@ def parse_BioLogic_date(date_text):
|
||||
return date(tm.tm_year, tm.tm_mon, tm.tm_mday)
|
||||
|
||||
|
||||
def VMPdata_dtype_from_colIDs(colIDs):
|
||||
def VMPdata_dtype_from_colIDs(colIDs, error_on_unknown_column: bool = True):
|
||||
"""Get a numpy record type from a list of column ID numbers.
|
||||
|
||||
The binary layout of the data in the MPR file is described by the sequence
|
||||
of column ID numbers in the file header. This function converts that
|
||||
sequence into a numpy dtype which can then be used to load data from the
|
||||
sequence into a list that can be used with numpy dtype load data from the
|
||||
file with np.frombuffer().
|
||||
|
||||
Some column IDs refer to small values which are packed into a single byte.
|
||||
The second return value is a dict describing the bit masks with which to
|
||||
extract these columns from the flags byte.
|
||||
|
||||
If error_on_unknown_column is True, an error will be raised if an unknown
|
||||
column ID is encountered. If it is False, a warning will be emited and attempts
|
||||
will be made to read the column with a few different dtypes.
|
||||
|
||||
|
||||
"""
|
||||
type_list = []
|
||||
field_name_counts = defaultdict(int)
|
||||
@@ -420,11 +478,19 @@ def VMPdata_dtype_from_colIDs(colIDs):
|
||||
unique_field_name = field_name
|
||||
type_list.append((unique_field_name, field_type))
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
"Column ID {cid} after column {prev} "
|
||||
"is unknown".format(cid=colID, prev=type_list[-1][0])
|
||||
if error_on_unknown_column:
|
||||
raise NotImplementedError(
|
||||
"Column ID {cid} after column {prev} is unknown".format(
|
||||
cid=colID, prev=type_list[-1][0]
|
||||
)
|
||||
)
|
||||
warnings.warn(
|
||||
"Unknown column ID %d -- will attempt to read as common dtypes"
|
||||
% colID
|
||||
)
|
||||
return np.dtype(type_list), flags_dict
|
||||
type_list.append(("unknown_colID_%d" % colID, UNKNOWN_COLUMN_TYPE_HIERARCHY[0]))
|
||||
|
||||
return type_list, flags_dict
|
||||
|
||||
|
||||
def read_VMP_modules(fileobj, read_module_data=True):
|
||||
@@ -441,11 +507,18 @@ def read_VMP_modules(fileobj, read_module_data=True):
|
||||
raise ValueError(
|
||||
"Found %r, expecting start of new VMP MODULE" % module_magic
|
||||
)
|
||||
VMPmodule_hdr = VMPmodule_hdr_v1
|
||||
|
||||
# Reading headers binary information
|
||||
hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize)
|
||||
if len(hdr_bytes) < VMPmodule_hdr.itemsize:
|
||||
raise IOError("Unexpected end of file while reading module header")
|
||||
|
||||
# Checking if EC-Lab version is >= 11.50
|
||||
if hdr_bytes[35:39] == b"\xff\xff\xff\xff":
|
||||
VMPmodule_hdr = VMPmodule_hdr_v2
|
||||
hdr_bytes += fileobj.read(VMPmodule_hdr_v2.itemsize - VMPmodule_hdr_v1.itemsize)
|
||||
|
||||
hdr = np.frombuffer(hdr_bytes, dtype=VMPmodule_hdr, count=1)
|
||||
hdr_dict = dict(((n, hdr[n][0]) for n in VMPmodule_hdr.names))
|
||||
hdr_dict["offset"] = fileobj.tell()
|
||||
@@ -457,7 +530,11 @@ def read_VMP_modules(fileobj, read_module_data=True):
|
||||
current module: %s
|
||||
length read: %d
|
||||
length expected: %d"""
|
||||
% (hdr_dict["longname"], len(hdr_dict["data"]), hdr_dict["length"])
|
||||
% (
|
||||
hdr_dict["longname"],
|
||||
len(hdr_dict["data"]),
|
||||
hdr_dict["length"],
|
||||
)
|
||||
)
|
||||
yield hdr_dict
|
||||
else:
|
||||
@@ -465,6 +542,85 @@ def read_VMP_modules(fileobj, read_module_data=True):
|
||||
fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET)
|
||||
|
||||
|
||||
def loop_from_file(file: str, encoding: str = "latin1"):
|
||||
"""
|
||||
When an experiment is still running and it includes loops,
|
||||
a _LOOP.txt file is temporarily created to progressively store the indexes of new loops.
|
||||
This function reads the file and creates the loop_index array for MPRfile initialization.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
file : str
|
||||
Path of the loop file.
|
||||
encoding : str, optional
|
||||
Encoding of the text file. The default is "latin1".
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the file does not start with "VMP EXPERIMENT LOOP INDEXES".
|
||||
|
||||
Returns
|
||||
-------
|
||||
loop_index : np.array
|
||||
Indexes of data points that start a new loop.
|
||||
|
||||
"""
|
||||
with open(file, "r", encoding=encoding) as f:
|
||||
line = f.readline().strip()
|
||||
if line != LOOP_MAGIC:
|
||||
raise ValueError("Invalid magic for LOOP.txt file")
|
||||
loop_index = np.array([int(line) for line in f], dtype="u4")
|
||||
|
||||
return loop_index
|
||||
|
||||
|
||||
def timestamp_from_file(file: str, encoding: str = "latin1"):
|
||||
"""
|
||||
When an experiment is still running, a .mpl file is temporarily created to store
|
||||
information that will be added in the log module and will be appended to the data
|
||||
module in the .mpr file at the end of experiment.
|
||||
This function reads the file and extracts the experimental starting date and time
|
||||
as a timestamp for MPRfile initialization.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
file : str
|
||||
Path of the log file.
|
||||
encoding : str, optional
|
||||
Encoding of the text file. The default is "latin1".
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the file does not start with "EC-Lab LOG FILE" or "BT-Lab LOG FILE".
|
||||
|
||||
Returns
|
||||
-------
|
||||
timestamp
|
||||
Date and time of the start of data acquisition
|
||||
"""
|
||||
with open(file, "r", encoding=encoding) as f:
|
||||
line = f.readline().strip()
|
||||
if line not in LOG_MAGIC:
|
||||
raise ValueError("Invalid magic for .mpl file")
|
||||
log = f.read()
|
||||
start = tuple(
|
||||
map(
|
||||
int,
|
||||
re.findall(
|
||||
r"Acquisition started on : (\d+)\/(\d+)\/(\d+) (\d+):(\d+):(\d+)\.(\d+)",
|
||||
"".join(log),
|
||||
)[0],
|
||||
)
|
||||
)
|
||||
return datetime(
|
||||
int(start[2]), start[0], start[1], start[3], start[4], start[5], start[6] * 1000
|
||||
)
|
||||
|
||||
|
||||
LOG_MAGIC = "EC-Lab LOG FILEBT-Lab LOG FILE"
|
||||
LOOP_MAGIC = "VMP EXPERIMENT LOOP INDEXES"
|
||||
MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00"
|
||||
|
||||
|
||||
@@ -484,10 +640,22 @@ class MPRfile:
|
||||
enddate - The date when the experiment finished
|
||||
"""
|
||||
|
||||
def __init__(self, file_or_path):
|
||||
def __init__(self, file_or_path, error_on_unknown_column: bool = True):
|
||||
"""Pass an EC-lab .mpr file to be parsed.
|
||||
|
||||
Parameters:
|
||||
file_or_path: Either the open file data or a path to it.
|
||||
error_on_unknown_column: Whether or not to raise an error if an
|
||||
unknown column ID is encountered. A warning will be emited and
|
||||
the column will be added 'unknown_<colID>', with an attempt to read
|
||||
it with a few different dtypes.
|
||||
|
||||
"""
|
||||
self.loop_index = None
|
||||
if isinstance(file_or_path, str):
|
||||
mpr_file = open(file_or_path, "rb")
|
||||
loop_file = file_or_path[:-4] + "_LOOP.txt" # loop file for running experiment
|
||||
log_file = file_or_path[:-1] + "l" # log file for runnning experiment
|
||||
else:
|
||||
mpr_file = file_or_path
|
||||
magic = mpr_file.read(len(MPR_MAGIC))
|
||||
@@ -495,6 +663,7 @@ class MPRfile:
|
||||
raise ValueError("Invalid magic for .mpr file: %s" % magic)
|
||||
|
||||
modules = list(read_VMP_modules(mpr_file))
|
||||
|
||||
self.modules = modules
|
||||
(settings_mod,) = (m for m in modules if m["shortname"] == b"VMP Set ")
|
||||
(data_module,) = (m for m in modules if m["shortname"] == b"VMP data ")
|
||||
@@ -505,15 +674,22 @@ class MPRfile:
|
||||
n_columns = np.frombuffer(data_module["data"][4:5], dtype="u1").item()
|
||||
|
||||
if data_module["version"] == 0:
|
||||
column_types = np.frombuffer(
|
||||
data_module["data"][5:], dtype="u1", count=n_columns
|
||||
)
|
||||
remaining_headers = data_module["data"][5 + n_columns:100]
|
||||
main_data = data_module["data"][100:]
|
||||
# If EC-Lab version >= 11.50, column_types is [0 1 0 3 0 174...] instead of [1 3 174...]
|
||||
if np.frombuffer(data_module["data"][5:6], dtype="u1").item():
|
||||
column_types = np.frombuffer(data_module["data"][5:], dtype="u1", count=n_columns)
|
||||
remaining_headers = data_module["data"][5 + n_columns:100]
|
||||
main_data = data_module["data"][100:]
|
||||
else:
|
||||
column_types = np.frombuffer(
|
||||
data_module["data"][5:], dtype="u1", count=n_columns * 2
|
||||
)
|
||||
column_types = column_types[1::2] # suppressing zeros in column types array
|
||||
# remaining headers should be empty except for bytes 5 + n_columns * 2
|
||||
# and 1006 which are sometimes == 1
|
||||
remaining_headers = data_module["data"][6 + n_columns * 2:1006]
|
||||
main_data = data_module["data"][1007:]
|
||||
elif data_module["version"] in [2, 3]:
|
||||
column_types = np.frombuffer(
|
||||
data_module["data"][5:], dtype="<u2", count=n_columns
|
||||
)
|
||||
column_types = np.frombuffer(data_module["data"][5:], dtype="<u2", count=n_columns)
|
||||
# There are bytes of data before the main array starts
|
||||
if data_module["version"] == 3:
|
||||
num_bytes_before = 406 # version 3 added `\x01` to the start
|
||||
@@ -528,8 +704,50 @@ class MPRfile:
|
||||
|
||||
assert not any(remaining_headers)
|
||||
|
||||
self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
|
||||
self.data = np.frombuffer(main_data, dtype=self.dtype)
|
||||
dtypes, self.flags_dict = VMPdata_dtype_from_colIDs(
|
||||
column_types, error_on_unknown_column=error_on_unknown_column
|
||||
)
|
||||
|
||||
unknown_cols = []
|
||||
# Iteratively work through the unknown columns and try to read them
|
||||
if not error_on_unknown_column:
|
||||
for col, _ in dtypes:
|
||||
if col.startswith("unknown_colID"):
|
||||
unknown_cols.append(col)
|
||||
if len(unknown_cols) > 3:
|
||||
raise RuntimeError(
|
||||
"Too many unknown columns to attempt to read combinatorially: %s"
|
||||
% unknown_cols
|
||||
)
|
||||
|
||||
if unknown_cols:
|
||||
# create a list of all possible combinations of dtypes
|
||||
# for the unknown columns
|
||||
from itertools import product
|
||||
perms = product(UNKNOWN_COLUMN_TYPE_HIERARCHY, repeat=len(unknown_cols))
|
||||
for perm in perms:
|
||||
for unknown_col_ind, c in enumerate(unknown_cols):
|
||||
for ind, (col, _) in enumerate(dtypes):
|
||||
if c == col:
|
||||
dtypes[ind] = (col, perm[unknown_col_ind])
|
||||
|
||||
try:
|
||||
self.dtype = np.dtype(dtypes)
|
||||
self.data = np.frombuffer(main_data, dtype=self.dtype)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"Unable to read data for unknown columns %s with any of the common dtypes %s",
|
||||
unknown_cols,
|
||||
UNKNOWN_COLUMN_TYPE_HIERARCHY
|
||||
)
|
||||
|
||||
else:
|
||||
self.dtype = np.dtype(dtypes)
|
||||
self.data = np.frombuffer(main_data, dtype=self.dtype)
|
||||
|
||||
assert self.data.shape[0] == n_data_points
|
||||
|
||||
# No idea what these 'column types' mean or even if they are actually
|
||||
@@ -542,12 +760,17 @@ class MPRfile:
|
||||
if maybe_loop_module:
|
||||
(loop_module,) = maybe_loop_module
|
||||
if loop_module["version"] == 0:
|
||||
self.loop_index = np.fromstring(loop_module["data"][4:], dtype="<u4")
|
||||
self.loop_index = np.frombuffer(loop_module["data"][4:], dtype="<u4")
|
||||
self.loop_index = np.trim_zeros(self.loop_index, "b")
|
||||
else:
|
||||
raise ValueError(
|
||||
"Unrecognised version for data module: %d" % data_module["version"]
|
||||
)
|
||||
else:
|
||||
if os.path.isfile(loop_file):
|
||||
self.loop_index = loop_from_file(loop_file)
|
||||
if self.loop_index[-1] < n_data_points:
|
||||
self.loop_index = np.append(self.loop_index, n_data_points)
|
||||
|
||||
if maybe_log_module:
|
||||
(log_module,) = maybe_log_module
|
||||
@@ -591,6 +814,10 @@ class MPRfile:
|
||||
+ " End date: %s\n" % self.enddate
|
||||
+ " Timestamp: %s\n" % self.timestamp
|
||||
)
|
||||
else:
|
||||
if os.path.isfile(log_file):
|
||||
self.timestamp = timestamp_from_file(log_file)
|
||||
self.enddate = None
|
||||
|
||||
def get_flag(self, flagname):
|
||||
if flagname in self.flags_dict:
|
||||
|
||||
+21
-3
@@ -439,7 +439,8 @@ CREATE VIEW IF NOT EXISTS Capacity_View
|
||||
def mdb_get_data_text(s3db, filename, table):
|
||||
print("Reading %s..." % table)
|
||||
insert_pattern = re.compile(
|
||||
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', re.IGNORECASE
|
||||
r"""INSERT INTO "\w+" \([^)]+?\) VALUES (\((('[^']*')|"[^"]*"|[^')])+?\),?\s*)+;\n""",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
try:
|
||||
# Initialize values to avoid NameError in except clause
|
||||
@@ -570,13 +571,25 @@ def mdb_get_version(filename):
|
||||
return version_tuple
|
||||
|
||||
|
||||
def convert_arbin_to_sqlite(input_file, output_file):
|
||||
def convert_arbin_to_sqlite(input_file, output_file=None):
|
||||
"""Read data from an Arbin .res data file and write to a sqlite file.
|
||||
|
||||
Any data currently in the sqlite file will be erased!
|
||||
Any data currently in an sqlite file at `output_file` will be erased!
|
||||
|
||||
Parameters:
|
||||
input_file (str): The path to the Arbin .res file to read from.
|
||||
output_file (str or None): The path to the sqlite file to write to; if None,
|
||||
return a `sqlite3.Connection` into an in-memory database.
|
||||
|
||||
Returns:
|
||||
None or sqlite3.Connection
|
||||
|
||||
"""
|
||||
arbin_version = mdb_get_version(input_file)
|
||||
|
||||
if output_file is None:
|
||||
output_file = ":memory:"
|
||||
|
||||
s3db = sqlite3.connect(output_file)
|
||||
|
||||
tables_to_convert = copy(mdb_tables)
|
||||
@@ -601,6 +614,11 @@ def convert_arbin_to_sqlite(input_file, output_file):
|
||||
print("Vacuuming database...")
|
||||
s3db.executescript("VACUUM; ANALYZE;")
|
||||
|
||||
if output_file == ":memory:":
|
||||
return s3db
|
||||
|
||||
s3db.close()
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
parser = argparse.ArgumentParser(
|
||||
|
||||
@@ -12,11 +12,11 @@ with open(os.path.join(os.path.dirname(__file__), "README.md")) as f:
|
||||
|
||||
setup(
|
||||
name="galvani",
|
||||
version="0.3.0",
|
||||
version="0.5.0",
|
||||
description="Open and process battery charger log data files",
|
||||
long_description=readme,
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/echemdata/galvani",
|
||||
url="https://codeberg.org/echemdata/galvani",
|
||||
author="Chris Kerr",
|
||||
author_email="chris.kerr@mykolab.ch",
|
||||
license="GPLv3+",
|
||||
|
||||
@@ -53,6 +53,16 @@ def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename):
|
||||
csr.fetchone()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("basename", ["arbin1", "UM34_Test005E"])
|
||||
def test_convert_Arbin_to_sqlite_function_in_memory(testdata_dir, tmpdir, basename):
|
||||
"""Convert an Arbin file to an in-memory SQLite database."""
|
||||
res_file = os.path.join(testdata_dir, basename + ".res")
|
||||
conn = None
|
||||
with res2sqlite.convert_arbin_to_sqlite(res_file) as conn:
|
||||
csr = conn.execute("SELECT * FROM Channel_Normal_Table;")
|
||||
csr.fetchone()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not have_mdbtools, reason="Reading the Arbin file requires MDBTools"
|
||||
)
|
||||
|
||||
+127
-2
@@ -9,7 +9,7 @@ import re
|
||||
from datetime import date, datetime
|
||||
|
||||
import numpy as np
|
||||
from numpy.testing import assert_array_almost_equal, assert_array_equal
|
||||
from numpy.testing import assert_array_almost_equal, assert_array_equal, assert_allclose
|
||||
import pytest
|
||||
|
||||
from galvani import BioLogic, MPTfile, MPRfile
|
||||
@@ -99,7 +99,7 @@ def test_colID_to_dtype(colIDs, expected):
|
||||
return
|
||||
expected_dtype = np.dtype(expected)
|
||||
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
|
||||
assert dtype == expected_dtype
|
||||
assert np.dtype(dtype) == expected_dtype
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -210,6 +210,95 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
|
||||
pass
|
||||
|
||||
|
||||
def assert_MPR_matches_MPT_v2(mpr, mpt, comments):
|
||||
"""
|
||||
Asserts that the fields in the MPR.data ar the same as in the MPT.
|
||||
|
||||
Modified from assert_MPR_matches_MPT. Automatically converts dtype from MPT data
|
||||
to dtype from MPR data before comparing the columns.
|
||||
|
||||
Special case for EIS_indicators: these fields are valid only at f<100kHz so their
|
||||
values are replaced by -1 or 0 at high frequency in the MPT file, this is not the
|
||||
case in the MPR data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
mpr : MPRfile
|
||||
Data extracted with the MPRfile class.
|
||||
mpt : np.array
|
||||
Data extracted with MPTfile method.
|
||||
|
||||
Returns
|
||||
-------
|
||||
None.
|
||||
|
||||
"""
|
||||
|
||||
def assert_field_matches(fieldname):
|
||||
EIS_quality_indicators = [
|
||||
"THD Ewe/%",
|
||||
"NSD Ewe/%",
|
||||
"NSR Ewe/%",
|
||||
"|Ewe h2|/V",
|
||||
"|Ewe h3|/V",
|
||||
"|Ewe h4|/V",
|
||||
"|Ewe h5|/V",
|
||||
"|Ewe h6|/V",
|
||||
"|Ewe h7|/V",
|
||||
"THD I/%",
|
||||
"NSD I/%",
|
||||
"NSR I/%",
|
||||
"|I h2|/A",
|
||||
"|I h3|/A",
|
||||
"|I h4|/A",
|
||||
"|I h5|/A",
|
||||
"|I h6|/A",
|
||||
"|I h7|/A",
|
||||
]
|
||||
|
||||
if fieldname in EIS_quality_indicators: # EIS quality indicators only valid for f < 100kHz
|
||||
index_inf_100k = np.where(mpr.data["freq/Hz"] < 100000)[0]
|
||||
assert_allclose(
|
||||
mpr.data[index_inf_100k][fieldname],
|
||||
mpt[index_inf_100k][fieldname].astype(mpr.data[fieldname].dtype),
|
||||
)
|
||||
elif fieldname == "<Ewe>/V":
|
||||
assert_allclose(
|
||||
mpr.data[fieldname],
|
||||
mpt["Ewe/V"].astype(mpr.data[fieldname].dtype),
|
||||
)
|
||||
elif fieldname == "<I>/mA":
|
||||
assert_allclose(
|
||||
mpr.data[fieldname],
|
||||
mpt["I/mA"].astype(mpr.data[fieldname].dtype),
|
||||
)
|
||||
elif fieldname == "dq/mA.h":
|
||||
assert_allclose(
|
||||
mpr.data[fieldname],
|
||||
mpt["dQ/mA.h"].astype(mpr.data[fieldname].dtype),
|
||||
)
|
||||
else:
|
||||
assert_allclose(
|
||||
mpr.data[fieldname],
|
||||
mpt[fieldname].astype(mpr.data[fieldname].dtype),
|
||||
)
|
||||
|
||||
def assert_field_exact(fieldname):
|
||||
if fieldname in mpr.dtype.fields:
|
||||
assert_array_equal(mpr.data[fieldname], mpt[fieldname])
|
||||
|
||||
for key in mpr.flags_dict.keys():
|
||||
assert_array_equal(mpr.get_flag(key), mpt[key])
|
||||
|
||||
for d in mpr.dtype.descr[1:]:
|
||||
assert_field_matches(d[0])
|
||||
|
||||
try:
|
||||
assert timestamp_from_comments(comments) == mpr.timestamp.replace(microsecond=0)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"basename",
|
||||
[
|
||||
@@ -252,3 +341,39 @@ def test_MPR6_matches_MPT6(testdata_dir):
|
||||
mpt, comments = MPTfile(os.path.join(testdata_dir, "bio_logic6.mpt"))
|
||||
mpr.data = mpr.data[:958] # .mpt file is incomplete
|
||||
assert_MPR_matches_MPT(mpr, mpt, comments)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"basename_v1150",
|
||||
["v1150_CA", "v1150_CP", "v1150_GCPL", "v1150_GEIS", "v1150_MB", "v1150_OCV", "v1150_PEIS"],
|
||||
)
|
||||
def test_MPR_matches_MPT_v1150(testdata_dir, basename_v1150):
|
||||
"""Check the MPR parser against the MPT parser.
|
||||
|
||||
Load a binary .mpr file and a text .mpt file which should contain
|
||||
exactly the same data. Check that the loaded data actually match.
|
||||
"""
|
||||
binpath = os.path.join(testdata_dir, "v1150", basename_v1150 + ".mpr")
|
||||
txtpath = os.path.join(testdata_dir, "v1150", basename_v1150 + ".mpt")
|
||||
mpr = MPRfile(binpath)
|
||||
mpt, comments = MPTfile(txtpath, encoding="latin1")
|
||||
assert_MPR_matches_MPT_v2(mpr, mpt, comments)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Test data file is missing")
|
||||
def test_loop_from_file(testdata_dir):
|
||||
"""Check if the loop_index is correctly extracted from the _LOOP.txt file
|
||||
"""
|
||||
mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr"))
|
||||
assert mpr.loop_index is not None, "No loop_index found"
|
||||
assert len(mpr.loop_index) == 4, "loop_index is not the right size"
|
||||
assert_array_equal(mpr.loop_index, [0, 4, 8, 11], "loop_index values are wrong")
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Test data file is missing")
|
||||
def test_timestamp_from_file(testdata_dir):
|
||||
"""Check if the loop_index is correctly extracted from the _LOOP.txt file
|
||||
"""
|
||||
mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr"))
|
||||
assert hasattr(mpr, "timestamp"), "No timestamp found"
|
||||
assert mpr.timestamp.timestamp() == pytest.approx(1707299985.908), "timestamp value is wrong"
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Reference in New Issue
Block a user