mirror of
https://github.com/echemdata/galvani.git
synced 2025-12-14 09:15:34 +00:00
Merge pull request #30 from chatcannon/test-and-fix-Arbin
Fix Arbin reading code
This commit is contained in:
@@ -3,6 +3,7 @@ language: python
|
|||||||
cache:
|
cache:
|
||||||
directories:
|
directories:
|
||||||
- .tox
|
- .tox
|
||||||
|
- .pytest_cache
|
||||||
- tests/testdata
|
- tests/testdata
|
||||||
python:
|
python:
|
||||||
- "2.7"
|
- "2.7"
|
||||||
|
|||||||
@@ -371,10 +371,21 @@ def mdb_get_data_text(s3db, filename, table):
|
|||||||
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n',
|
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n',
|
||||||
re.IGNORECASE
|
re.IGNORECASE
|
||||||
)
|
)
|
||||||
|
# TODO after dropping Python 2 support - use Popen as contextmanager
|
||||||
try:
|
try:
|
||||||
mdb_sql = sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
|
mdb_sql = sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
|
||||||
bufsize=-1, stdin=None, stdout=sp.PIPE,
|
bufsize=-1, stdin=None, stdout=sp.PIPE,
|
||||||
universal_newlines=True)
|
universal_newlines=True)
|
||||||
|
except OSError as e:
|
||||||
|
if e.errno == 2:
|
||||||
|
raise RuntimeError('Could not locate the `mdb-export` executable. '
|
||||||
|
'Check that mdbtools is properly installed.')
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
try:
|
||||||
|
# Initialize values to avoid NameError in except clause
|
||||||
|
mdb_output = ''
|
||||||
|
insert_match = None
|
||||||
mdb_output = mdb_sql.stdout.read()
|
mdb_output = mdb_sql.stdout.read()
|
||||||
while len(mdb_output) > 0:
|
while len(mdb_output) > 0:
|
||||||
insert_match = insert_pattern.match(mdb_output)
|
insert_match = insert_pattern.match(mdb_output)
|
||||||
@@ -383,7 +394,8 @@ def mdb_get_data_text(s3db, filename, table):
|
|||||||
s3db.commit()
|
s3db.commit()
|
||||||
except BaseException:
|
except BaseException:
|
||||||
print("Error while importing %s" % table)
|
print("Error while importing %s" % table)
|
||||||
print("Remaining mdb-export output:", mdb_output)
|
if mdb_output:
|
||||||
|
print("Remaining mdb-export output:", mdb_output)
|
||||||
if insert_match:
|
if insert_match:
|
||||||
print("insert_re match:", insert_match)
|
print("insert_re match:", insert_match)
|
||||||
raise
|
raise
|
||||||
@@ -393,10 +405,18 @@ def mdb_get_data_text(s3db, filename, table):
|
|||||||
|
|
||||||
def mdb_get_data_numeric(s3db, filename, table):
|
def mdb_get_data_numeric(s3db, filename, table):
|
||||||
print("Reading %s..." % table)
|
print("Reading %s..." % table)
|
||||||
|
# TODO after dropping Python 2 support - use Popen as contextmanager
|
||||||
try:
|
try:
|
||||||
mdb_sql = sp.Popen(['mdb-export', filename, table],
|
mdb_sql = sp.Popen(['mdb-export', filename, table],
|
||||||
bufsize=-1, stdin=None, stdout=sp.PIPE,
|
bufsize=-1, stdin=None, stdout=sp.PIPE,
|
||||||
universal_newlines=True)
|
universal_newlines=True)
|
||||||
|
except OSError as e:
|
||||||
|
if e.errno == 2:
|
||||||
|
raise RuntimeError('Could not locate the `mdb-export` executable. '
|
||||||
|
'Check that mdbtools is properly installed.')
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
try:
|
||||||
mdb_csv = csv.reader(mdb_sql.stdout)
|
mdb_csv = csv.reader(mdb_sql.stdout)
|
||||||
mdb_headers = next(mdb_csv)
|
mdb_headers = next(mdb_csv)
|
||||||
quoted_headers = ['"%s"' % h for h in mdb_headers]
|
quoted_headers = ['"%s"' % h for h in mdb_headers]
|
||||||
|
|||||||
11
tests/conftest.py
Normal file
11
tests/conftest.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
"""Helpers for pytest tests."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session')
|
||||||
|
def testdata_dir():
|
||||||
|
"""Path to the testdata directory."""
|
||||||
|
return os.path.join(os.path.dirname(__file__), 'testdata')
|
||||||
56
tests/test_Arbin.py
Normal file
56
tests/test_Arbin.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
"""Tests for loading Arbin .res files."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from galvani import res2sqlite
|
||||||
|
|
||||||
|
|
||||||
|
# TODO - change to subprocess.DEVNULL when python 2 support is removed
|
||||||
|
have_mdbtools = (subprocess.call(['which', 'mdb-export'], stdout=None) == 0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_res2sqlite_help():
|
||||||
|
"""Test running `res2sqlite --help`.
|
||||||
|
|
||||||
|
This should work even when mdbtools is not installed.
|
||||||
|
"""
|
||||||
|
help_output = subprocess.check_output(['res2sqlite', '--help'])
|
||||||
|
assert b'Convert Arbin .res files to sqlite3 databases' in help_output
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(have_mdbtools, reason='This tests the failure when mdbtools is not installed')
|
||||||
|
def test_convert_Arbin_no_mdbtools(testdata_dir, tmpdir):
|
||||||
|
"""Checks that the conversion fails with an appropriate error message."""
|
||||||
|
res_file = os.path.join(testdata_dir, 'arbin1.res')
|
||||||
|
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db')
|
||||||
|
with pytest.raises(RuntimeError, match="Could not locate the `mdb-export` executable."):
|
||||||
|
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools')
|
||||||
|
@pytest.mark.parametrize('basename', ['arbin1'])
|
||||||
|
def test_convert_Arbin_to_sqlite_function(testdata_dir, tmpdir, basename):
|
||||||
|
"""Convert an Arbin file to SQLite using the functional interface."""
|
||||||
|
res_file = os.path.join(testdata_dir, basename + '.res')
|
||||||
|
sqlite_file = os.path.join(str(tmpdir), basename + '.s3db')
|
||||||
|
res2sqlite.convert_arbin_to_sqlite(res_file, sqlite_file)
|
||||||
|
assert os.path.isfile(sqlite_file)
|
||||||
|
with sqlite3.connect(sqlite_file) as conn:
|
||||||
|
csr = conn.execute('SELECT * FROM Channel_Normal_Table;')
|
||||||
|
csr.fetchone()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not have_mdbtools, reason='Reading the Arbin file requires MDBTools')
|
||||||
|
def test_convert_cmdline(testdata_dir, tmpdir):
|
||||||
|
"""Checks that the conversion fails with an appropriate error message."""
|
||||||
|
res_file = os.path.join(testdata_dir, 'arbin1.res')
|
||||||
|
sqlite_file = os.path.join(str(tmpdir), 'arbin1.s3db')
|
||||||
|
subprocess.check_call(['res2sqlite', res_file, sqlite_file])
|
||||||
|
assert os.path.isfile(sqlite_file)
|
||||||
|
with sqlite3.connect(sqlite_file) as conn:
|
||||||
|
csr = conn.execute('SELECT * FROM Channel_Normal_Table;')
|
||||||
|
csr.fetchone()
|
||||||
@@ -11,10 +11,8 @@ import pytest
|
|||||||
from galvani import BioLogic, MPTfile, MPRfile
|
from galvani import BioLogic, MPTfile, MPRfile
|
||||||
from galvani.BioLogic import MPTfileCSV, str3 # not exported
|
from galvani.BioLogic import MPTfileCSV, str3 # not exported
|
||||||
|
|
||||||
testdata_dir = os.path.join(os.path.dirname(__file__), 'testdata')
|
|
||||||
|
|
||||||
|
def test_open_MPT(testdata_dir):
|
||||||
def test_open_MPT():
|
|
||||||
mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt'))
|
mpt1, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpt'))
|
||||||
assert comments == []
|
assert comments == []
|
||||||
assert mpt1.dtype.names == (
|
assert mpt1.dtype.names == (
|
||||||
@@ -24,12 +22,12 @@ def test_open_MPT():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_open_MPT_fails_for_bad_file():
|
def test_open_MPT_fails_for_bad_file(testdata_dir):
|
||||||
with pytest.raises(ValueError, match='Bad first line'):
|
with pytest.raises(ValueError, match='Bad first line'):
|
||||||
MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr'))
|
MPTfile(os.path.join(testdata_dir, 'bio_logic1.mpr'))
|
||||||
|
|
||||||
|
|
||||||
def test_open_MPT_csv():
|
def test_open_MPT_csv(testdata_dir):
|
||||||
mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt'))
|
mpt1, comments = MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpt'))
|
||||||
assert comments == []
|
assert comments == []
|
||||||
assert mpt1.fieldnames == [
|
assert mpt1.fieldnames == [
|
||||||
@@ -39,7 +37,7 @@ def test_open_MPT_csv():
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_open_MPT_csv_fails_for_bad_file():
|
def test_open_MPT_csv_fails_for_bad_file(testdata_dir):
|
||||||
with pytest.raises((ValueError, UnicodeDecodeError)):
|
with pytest.raises((ValueError, UnicodeDecodeError)):
|
||||||
MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr'))
|
MPTfileCSV(os.path.join(testdata_dir, 'bio_logic1.mpr'))
|
||||||
|
|
||||||
@@ -86,7 +84,7 @@ def test_colID_to_dtype(colIDs, expected):
|
|||||||
# C019P-0ppb-A_C01.mpr stores the date in a different format
|
# C019P-0ppb-A_C01.mpr stores the date in a different format
|
||||||
('C019P-0ppb-A_C01.mpr', '2019-03-14', '2019-03-14'),
|
('C019P-0ppb-A_C01.mpr', '2019-03-14', '2019-03-14'),
|
||||||
])
|
])
|
||||||
def test_MPR_dates(filename, startdate, enddate):
|
def test_MPR_dates(testdata_dir, filename, startdate, enddate):
|
||||||
"""Check that the start and end dates in .mpr files are read correctly."""
|
"""Check that the start and end dates in .mpr files are read correctly."""
|
||||||
mpr = MPRfile(os.path.join(testdata_dir, filename))
|
mpr = MPRfile(os.path.join(testdata_dir, filename))
|
||||||
assert mpr.startdate.strftime('%Y-%m-%d') == startdate
|
assert mpr.startdate.strftime('%Y-%m-%d') == startdate
|
||||||
@@ -96,7 +94,7 @@ def test_MPR_dates(filename, startdate, enddate):
|
|||||||
assert not hasattr(mpr, 'enddate')
|
assert not hasattr(mpr, 'enddate')
|
||||||
|
|
||||||
|
|
||||||
def test_open_MPR_fails_for_bad_file():
|
def test_open_MPR_fails_for_bad_file(testdata_dir):
|
||||||
with pytest.raises(ValueError, match='Invalid magic for .mpr file'):
|
with pytest.raises(ValueError, match='Invalid magic for .mpr file'):
|
||||||
MPRfile(os.path.join(testdata_dir, 'arbin1.res'))
|
MPRfile(os.path.join(testdata_dir, 'arbin1.res'))
|
||||||
|
|
||||||
@@ -165,7 +163,7 @@ def assert_MPR_matches_MPT(mpr, mpt, comments):
|
|||||||
'CV_C01',
|
'CV_C01',
|
||||||
'121_CA_455nm_6V_30min_C01',
|
'121_CA_455nm_6V_30min_C01',
|
||||||
])
|
])
|
||||||
def test_MPR_matches_MPT(basename):
|
def test_MPR_matches_MPT(testdata_dir, basename):
|
||||||
"""Check the MPR parser against the MPT parser.
|
"""Check the MPR parser against the MPT parser.
|
||||||
|
|
||||||
Load a binary .mpr file and a text .mpt file which should contain
|
Load a binary .mpr file and a text .mpt file which should contain
|
||||||
@@ -178,7 +176,7 @@ def test_MPR_matches_MPT(basename):
|
|||||||
assert_MPR_matches_MPT(mpr, mpt, comments)
|
assert_MPR_matches_MPT(mpr, mpt, comments)
|
||||||
|
|
||||||
|
|
||||||
def test_MPR5_matches_MPT5():
|
def test_MPR5_matches_MPT5(testdata_dir):
|
||||||
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr'))
|
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic5.mpr'))
|
||||||
mpt, comments = MPTfile((re.sub(b'\tXXX\t', b'\t0\t', line) for line in
|
mpt, comments = MPTfile((re.sub(b'\tXXX\t', b'\t0\t', line) for line in
|
||||||
open(os.path.join(testdata_dir, 'bio_logic5.mpt'),
|
open(os.path.join(testdata_dir, 'bio_logic5.mpt'),
|
||||||
@@ -186,7 +184,7 @@ def test_MPR5_matches_MPT5():
|
|||||||
assert_MPR_matches_MPT(mpr, mpt, comments)
|
assert_MPR_matches_MPT(mpr, mpt, comments)
|
||||||
|
|
||||||
|
|
||||||
def test_MPR6_matches_MPT6():
|
def test_MPR6_matches_MPT6(testdata_dir):
|
||||||
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr'))
|
mpr = MPRfile(os.path.join(testdata_dir, 'bio_logic6.mpr'))
|
||||||
mpt, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic6.mpt'))
|
mpt, comments = MPTfile(os.path.join(testdata_dir, 'bio_logic6.mpt'))
|
||||||
mpr.data = mpr.data[:958] # .mpt file is incomplete
|
mpr.data = mpr.data[:958] # .mpt file is incomplete
|
||||||
|
|||||||
Reference in New Issue
Block a user