17 Commits

Author SHA1 Message Date
d4a5f444b1 Merge branch 'JhonFlash3008-loop-from-file'
Closes https://codeberg.org/echemdata/galvani/pulls/124

See also:
8a9d475222
8a9d475222
2025-07-30 15:26:14 +03:00
d81bf829bb Skip tests where the data file is missing 2025-07-30 15:24:22 +03:00
d77aa1555b Refactor tests 2025-07-30 15:24:22 +03:00
Jonathan Schillings
0d684af470 Add loop_from_file and timestamp_from_file functions
to extract loop_index and timestamp from the temporary _LOOP.txt and .mpl files during MPRfile initialization
Added unit tests but cannot upload test files due to LFS quota exceeded

Edited by Chris Kerr to fix flake8 warnings and resolve my comments from
the original PR https://github.com/echemdata/galvani/pull/102
2025-07-30 15:24:22 +03:00
baec8934b8 Merge remote-tracking branch 'cb-ml-evs/ml-evs/col-182-and-warning-only-mode'
Closes PR https://codeberg.org/echemdata/galvani/pulls/123
See also GitHub PR https://github.com/echemdata/galvani/pull/124
2025-07-30 14:36:59 +03:00
Matthew Evans
ccaa66b206 Convert to np.dtype in test 2025-06-13 18:24:42 +01:00
Matthew Evans
a59f263c2b Revert to defaulting to raising an error on unknown cols 2025-06-13 18:23:24 +01:00
Matthew Evans
30d6098aa0 Linting 2025-06-13 18:16:00 +01:00
Matthew Evans
2c90a2b038 Temporarily enable the new feature by default 2025-06-13 16:45:58 +01:00
Matthew Evans
5a207dbf5e Add guard for combinatorially exploring more than 3 unknown column data types 2025-06-13 16:29:21 +01:00
Matthew Evans
7964dc85db Add mode to attempt to read files with unknown columns and only warn 2025-06-13 16:22:30 +01:00
Matthew Evans
569a5f2a9c Add step time/s field for column 182 2025-06-13 15:48:19 +01:00
b6143e4b05 Merge branch 'move-to-codeberg' 2025-03-23 14:59:50 +02:00
4efec58374 Remove warning about Git LFS bandwidth limits 2025-03-23 14:51:52 +02:00
627387f9c4 Update URLs to point to CodeBerg repo 2025-03-23 14:51:52 +02:00
12b4badc31 Merge remote-tracking branch 'github/master' 2025-03-23 14:40:23 +02:00
5ed03ed20c Bump version to 0.5.0 2025-03-23 08:55:44 +02:00
5 changed files with 196 additions and 33 deletions

View File

@@ -1,7 +1,7 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: Galvani Upstream-Name: Galvani
Upstream-Contact: Christopher Kerr <chris.kerr@mykolab.ch> Upstream-Contact: Christopher Kerr <chris.kerr@mykolab.ch>
Source: https://github.com/echemdata/galvani Source: https://codeberg.org/echemdata/galvani
Files: tests/testdata/* Files: tests/testdata/*
Copyright: 2010-2014 Christopher Kerr <chris.kerr@mykolab.ch> Copyright: 2010-2014 Christopher Kerr <chris.kerr@mykolab.ch>

View File

@@ -47,36 +47,20 @@ The latest galvani releases can be installed from [PyPI](https://pypi.org/projec
pip install galvani pip install galvani
``` ```
The latest development version can be installed with `pip` directly from GitHub (see note about git-lfs below): The latest development version can be installed with `pip` directly from GitHub:
```shell ```shell
GIT_LFS_SKIP_SMUDGE=1 pip install git+https://github.com/echemdata/galvani pip install git+https://codeberg.org/echemdata/galvani
``` ```
## Development installation and contributing ## Development installation and contributing
> [!WARNING]
>
> This project uses Git Large File Storage (LFS) to store its test files,
> however the LFS quota provided by GitHub is frequently exceeded.
> This means that anyone cloning the repository with LFS installed will get
> failures unless they set the `GIT_LFS_SKIP_SMUDGE=1` environment variable when
> cloning.
> The full test data from the last release can always be obtained by
> downloading the GitHub release archives (tar or zip), at
> https://github.com/echemdata/galvani/releases/latest
>
> If you wish to add test files, please ensure they are as small as possible,
> and take care that your tests work locally without the need for the LFS files.
> Ideally, you could commit them to your fork when making a PR, and then they
> can be converted to LFS files as part of the review.
If you wish to contribute to galvani, please clone the repository and install the testing dependencies: If you wish to contribute to galvani, please clone the repository and install the testing dependencies:
```shell ```shell
git clone git@github.com:echemdata/galvani git clone git@codeberg.org:echemdata/galvani
cd galvani cd galvani
pip install -e .\[tests\] pip install -e .\[tests\]
``` ```
Code can be contributed back via [GitHub pull requests](https://github.com/echemdata/galvani/pulls) and new features or bugs can be discussed in the [issue tracker](https://github.com/echemdata/galvani/issues). Code can be contributed back via [pull requests](https://codeberg.org/echemdata/galvani/pulls) and new features or bugs can be discussed in the [issue tracker](https://codeberg.org/echemdata/galvani/issues).

View File

@@ -10,12 +10,16 @@ __all__ = ["MPTfileCSV", "MPTfile"]
import re import re
import csv import csv
from os import SEEK_SET from os import SEEK_SET
import os.path
import time import time
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from collections import defaultdict, OrderedDict from collections import defaultdict, OrderedDict
import warnings
import numpy as np import numpy as np
UNKNOWN_COLUMN_TYPE_HIERARCHY = ("<f8", "<f4", "<u4", "<u2", "<u1")
def fieldname_to_dtype(fieldname): def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of """Converts a column header from the MPT file into a tuple of
@@ -316,6 +320,7 @@ VMPdata_colID_dtype_map = {
174: ("<Ewe>/V", "<f4"), 174: ("<Ewe>/V", "<f4"),
178: ("(Q-Qo)/C", "<f4"), 178: ("(Q-Qo)/C", "<f4"),
179: ("dQ/C", "<f4"), 179: ("dQ/C", "<f4"),
182: ("step time/s", "<f8"),
211: ("Q charge/discharge/mA.h", "<f8"), 211: ("Q charge/discharge/mA.h", "<f8"),
212: ("half cycle", "<u4"), 212: ("half cycle", "<u4"),
213: ("z cycle", "<u4"), 213: ("z cycle", "<u4"),
@@ -427,18 +432,23 @@ def parse_BioLogic_date(date_text):
return date(tm.tm_year, tm.tm_mon, tm.tm_mday) return date(tm.tm_year, tm.tm_mon, tm.tm_mday)
def VMPdata_dtype_from_colIDs(colIDs): def VMPdata_dtype_from_colIDs(colIDs, error_on_unknown_column: bool = True):
"""Get a numpy record type from a list of column ID numbers. """Get a numpy record type from a list of column ID numbers.
The binary layout of the data in the MPR file is described by the sequence The binary layout of the data in the MPR file is described by the sequence
of column ID numbers in the file header. This function converts that of column ID numbers in the file header. This function converts that
sequence into a numpy dtype which can then be used to load data from the sequence into a list that can be used with numpy dtype load data from the
file with np.frombuffer(). file with np.frombuffer().
Some column IDs refer to small values which are packed into a single byte. Some column IDs refer to small values which are packed into a single byte.
The second return value is a dict describing the bit masks with which to The second return value is a dict describing the bit masks with which to
extract these columns from the flags byte. extract these columns from the flags byte.
If error_on_unknown_column is True, an error will be raised if an unknown
column ID is encountered. If it is False, a warning will be emited and attempts
will be made to read the column with a few different dtypes.
""" """
type_list = [] type_list = []
field_name_counts = defaultdict(int) field_name_counts = defaultdict(int)
@@ -468,11 +478,19 @@ def VMPdata_dtype_from_colIDs(colIDs):
unique_field_name = field_name unique_field_name = field_name
type_list.append((unique_field_name, field_type)) type_list.append((unique_field_name, field_type))
else: else:
raise NotImplementedError( if error_on_unknown_column:
"Column ID {cid} after column {prev} " raise NotImplementedError(
"is unknown".format(cid=colID, prev=type_list[-1][0]) "Column ID {cid} after column {prev} is unknown".format(
cid=colID, prev=type_list[-1][0]
)
)
warnings.warn(
"Unknown column ID %d -- will attempt to read as common dtypes"
% colID
) )
return np.dtype(type_list), flags_dict type_list.append(("unknown_colID_%d" % colID, UNKNOWN_COLUMN_TYPE_HIERARCHY[0]))
return type_list, flags_dict
def read_VMP_modules(fileobj, read_module_data=True): def read_VMP_modules(fileobj, read_module_data=True):
@@ -524,6 +542,85 @@ def read_VMP_modules(fileobj, read_module_data=True):
fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET) fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET)
def loop_from_file(file: str, encoding: str = "latin1"):
"""
When an experiment is still running and it includes loops,
a _LOOP.txt file is temporarily created to progressively store the indexes of new loops.
This function reads the file and creates the loop_index array for MPRfile initialization.
Parameters
----------
file : str
Path of the loop file.
encoding : str, optional
Encoding of the text file. The default is "latin1".
Raises
------
ValueError
If the file does not start with "VMP EXPERIMENT LOOP INDEXES".
Returns
-------
loop_index : np.array
Indexes of data points that start a new loop.
"""
with open(file, "r", encoding=encoding) as f:
line = f.readline().strip()
if line != LOOP_MAGIC:
raise ValueError("Invalid magic for LOOP.txt file")
loop_index = np.array([int(line) for line in f], dtype="u4")
return loop_index
def timestamp_from_file(file: str, encoding: str = "latin1"):
"""
When an experiment is still running, a .mpl file is temporarily created to store
information that will be added in the log module and will be appended to the data
module in the .mpr file at the end of experiment.
This function reads the file and extracts the experimental starting date and time
as a timestamp for MPRfile initialization.
Parameters
----------
file : str
Path of the log file.
encoding : str, optional
Encoding of the text file. The default is "latin1".
Raises
------
ValueError
If the file does not start with "EC-Lab LOG FILE" or "BT-Lab LOG FILE".
Returns
-------
timestamp
Date and time of the start of data acquisition
"""
with open(file, "r", encoding=encoding) as f:
line = f.readline().strip()
if line not in LOG_MAGIC:
raise ValueError("Invalid magic for .mpl file")
log = f.read()
start = tuple(
map(
int,
re.findall(
r"Acquisition started on : (\d+)\/(\d+)\/(\d+) (\d+):(\d+):(\d+)\.(\d+)",
"".join(log),
)[0],
)
)
return datetime(
int(start[2]), start[0], start[1], start[3], start[4], start[5], start[6] * 1000
)
LOG_MAGIC = "EC-Lab LOG FILEBT-Lab LOG FILE"
LOOP_MAGIC = "VMP EXPERIMENT LOOP INDEXES"
MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00" MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00"
@@ -543,10 +640,22 @@ class MPRfile:
enddate - The date when the experiment finished enddate - The date when the experiment finished
""" """
def __init__(self, file_or_path): def __init__(self, file_or_path, error_on_unknown_column: bool = True):
"""Pass an EC-lab .mpr file to be parsed.
Parameters:
file_or_path: Either the open file data or a path to it.
error_on_unknown_column: Whether or not to raise an error if an
unknown column ID is encountered. A warning will be emited and
the column will be added 'unknown_<colID>', with an attempt to read
it with a few different dtypes.
"""
self.loop_index = None self.loop_index = None
if isinstance(file_or_path, str): if isinstance(file_or_path, str):
mpr_file = open(file_or_path, "rb") mpr_file = open(file_or_path, "rb")
loop_file = file_or_path[:-4] + "_LOOP.txt" # loop file for running experiment
log_file = file_or_path[:-1] + "l" # log file for runnning experiment
else: else:
mpr_file = file_or_path mpr_file = file_or_path
magic = mpr_file.read(len(MPR_MAGIC)) magic = mpr_file.read(len(MPR_MAGIC))
@@ -595,8 +704,50 @@ class MPRfile:
assert not any(remaining_headers) assert not any(remaining_headers)
self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types) dtypes, self.flags_dict = VMPdata_dtype_from_colIDs(
self.data = np.frombuffer(main_data, dtype=self.dtype) column_types, error_on_unknown_column=error_on_unknown_column
)
unknown_cols = []
# Iteratively work through the unknown columns and try to read them
if not error_on_unknown_column:
for col, _ in dtypes:
if col.startswith("unknown_colID"):
unknown_cols.append(col)
if len(unknown_cols) > 3:
raise RuntimeError(
"Too many unknown columns to attempt to read combinatorially: %s"
% unknown_cols
)
if unknown_cols:
# create a list of all possible combinations of dtypes
# for the unknown columns
from itertools import product
perms = product(UNKNOWN_COLUMN_TYPE_HIERARCHY, repeat=len(unknown_cols))
for perm in perms:
for unknown_col_ind, c in enumerate(unknown_cols):
for ind, (col, _) in enumerate(dtypes):
if c == col:
dtypes[ind] = (col, perm[unknown_col_ind])
try:
self.dtype = np.dtype(dtypes)
self.data = np.frombuffer(main_data, dtype=self.dtype)
break
except ValueError:
continue
else:
raise RuntimeError(
"Unable to read data for unknown columns %s with any of the common dtypes %s",
unknown_cols,
UNKNOWN_COLUMN_TYPE_HIERARCHY
)
else:
self.dtype = np.dtype(dtypes)
self.data = np.frombuffer(main_data, dtype=self.dtype)
assert self.data.shape[0] == n_data_points assert self.data.shape[0] == n_data_points
# No idea what these 'column types' mean or even if they are actually # No idea what these 'column types' mean or even if they are actually
@@ -615,6 +766,11 @@ class MPRfile:
raise ValueError( raise ValueError(
"Unrecognised version for data module: %d" % data_module["version"] "Unrecognised version for data module: %d" % data_module["version"]
) )
else:
if os.path.isfile(loop_file):
self.loop_index = loop_from_file(loop_file)
if self.loop_index[-1] < n_data_points:
self.loop_index = np.append(self.loop_index, n_data_points)
if maybe_log_module: if maybe_log_module:
(log_module,) = maybe_log_module (log_module,) = maybe_log_module
@@ -658,6 +814,10 @@ class MPRfile:
+ " End date: %s\n" % self.enddate + " End date: %s\n" % self.enddate
+ " Timestamp: %s\n" % self.timestamp + " Timestamp: %s\n" % self.timestamp
) )
else:
if os.path.isfile(log_file):
self.timestamp = timestamp_from_file(log_file)
self.enddate = None
def get_flag(self, flagname): def get_flag(self, flagname):
if flagname in self.flags_dict: if flagname in self.flags_dict:

View File

@@ -12,11 +12,11 @@ with open(os.path.join(os.path.dirname(__file__), "README.md")) as f:
setup( setup(
name="galvani", name="galvani",
version="0.4.1", version="0.5.0",
description="Open and process battery charger log data files", description="Open and process battery charger log data files",
long_description=readme, long_description=readme,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
url="https://github.com/echemdata/galvani", url="https://codeberg.org/echemdata/galvani",
author="Chris Kerr", author="Chris Kerr",
author_email="chris.kerr@mykolab.ch", author_email="chris.kerr@mykolab.ch",
license="GPLv3+", license="GPLv3+",

View File

@@ -99,7 +99,7 @@ def test_colID_to_dtype(colIDs, expected):
return return
expected_dtype = np.dtype(expected) expected_dtype = np.dtype(expected)
dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs) dtype, flags_dict = BioLogic.VMPdata_dtype_from_colIDs(colIDs)
assert dtype == expected_dtype assert np.dtype(dtype) == expected_dtype
@pytest.mark.parametrize( @pytest.mark.parametrize(
@@ -358,3 +358,22 @@ def test_MPR_matches_MPT_v1150(testdata_dir, basename_v1150):
mpr = MPRfile(binpath) mpr = MPRfile(binpath)
mpt, comments = MPTfile(txtpath, encoding="latin1") mpt, comments = MPTfile(txtpath, encoding="latin1")
assert_MPR_matches_MPT_v2(mpr, mpt, comments) assert_MPR_matches_MPT_v2(mpr, mpt, comments)
@pytest.mark.skip(reason="Test data file is missing")
def test_loop_from_file(testdata_dir):
"""Check if the loop_index is correctly extracted from the _LOOP.txt file
"""
mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr"))
assert mpr.loop_index is not None, "No loop_index found"
assert len(mpr.loop_index) == 4, "loop_index is not the right size"
assert_array_equal(mpr.loop_index, [0, 4, 8, 11], "loop_index values are wrong")
@pytest.mark.skip(reason="Test data file is missing")
def test_timestamp_from_file(testdata_dir):
"""Check if the loop_index is correctly extracted from the _LOOP.txt file
"""
mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr"))
assert hasattr(mpr, "timestamp"), "No timestamp found"
assert mpr.timestamp.timestamp() == pytest.approx(1707299985.908), "timestamp value is wrong"