Merge pull request #40 from chatcannon/remove-python-2

Remove Python 2 support
This commit is contained in:
2020-07-05 11:05:08 +03:00
committed by GitHub
7 changed files with 53 additions and 68 deletions

View File

@@ -3,26 +3,16 @@
__all__ = ['MPTfileCSV', 'MPTfile']
import sys
import re
import csv
from os import SEEK_SET
import time
from datetime import date, datetime, timedelta
from collections import defaultdict, OrderedDict
import functools
import numpy as np
if sys.version_info.major <= 2:
str3 = str
from string import maketrans
else:
str3 = functools.partial(str, encoding='ascii')
maketrans = bytes.maketrans
def fieldname_to_dtype(fieldname):
"""Converts a column header from the MPT file into a tuple of
canonical name and appropriate numpy dtype"""
@@ -49,13 +39,13 @@ def fieldname_to_dtype(fieldname):
raise ValueError("Invalid column header: %s" % fieldname)
def comma_converter(float_string):
"""Convert numbers to floats whether the decimal point is '.' or ','"""
trans_table = maketrans(b',', b'.')
return float(float_string.translate(trans_table))
def comma_converter(float_text):
"""Convert text to float whether the decimal point is '.' or ','"""
trans_table = bytes.maketrans(b',', b'.')
return float(float_text.translate(trans_table))
def MPTfile(file_or_path):
def MPTfile(file_or_path, encoding='ascii'):
"""Opens .mpt files as numpy record arrays
Checks for the correct headings, skips any comments and returns a
@@ -71,8 +61,7 @@ def MPTfile(file_or_path):
if magic != b'EC-Lab ASCII FILE\r\n':
raise ValueError("Bad first line for EC-Lab file: '%s'" % magic)
# TODO use rb'string' here once Python 2 is no longer supported
nb_headers_match = re.match(b'Nb header lines : (\\d+)\\s*$',
nb_headers_match = re.match(rb'Nb header lines : (\d+)\s*$',
next(mpt_file))
nb_headers = int(nb_headers_match.group(1))
if nb_headers < 3:
@@ -82,7 +71,7 @@ def MPTfile(file_or_path):
# make three lines. Every additional line is a comment line.
comments = [next(mpt_file) for i in range(nb_headers - 3)]
fieldnames = str3(next(mpt_file)).strip().split('\t')
fieldnames = next(mpt_file).decode(encoding).strip().split('\t')
record_type = np.dtype(list(map(fieldname_to_dtype, fieldnames)))
# Must be able to parse files where commas are used for decimal points
@@ -345,10 +334,7 @@ class MPRfile:
raise ValueError("Unrecognised version for data module: %d" %
data_module['version'])
if sys.version_info.major <= 2:
assert(all((b == '\x00' for b in remaining_headers)))
else:
assert(not any(remaining_headers))
assert(not any(remaining_headers))
self.dtype, self.flags_dict = VMPdata_dtype_from_colIDs(column_types)
self.data = np.frombuffer(main_data, dtype=self.dtype)
@@ -361,9 +347,9 @@ class MPRfile:
self.npts = n_data_points
try:
tm = time.strptime(str3(settings_mod['date']), '%m/%d/%y')
tm = time.strptime(settings_mod['date'].decode('ascii'), '%m/%d/%y')
except ValueError:
tm = time.strptime(str3(settings_mod['date']), '%m-%d-%y')
tm = time.strptime(settings_mod['date'].decode('ascii'), '%m-%d-%y')
self.startdate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
if maybe_loop_module:
@@ -379,9 +365,9 @@ class MPRfile:
if maybe_log_module:
log_module, = maybe_log_module
try:
tm = time.strptime(str3(log_module['date']), '%m/%d/%y')
tm = time.strptime(log_module['date'].decode('ascii'), '%m/%d/%y')
except ValueError:
tm = time.strptime(str3(log_module['date']), '%m-%d-%y')
tm = time.strptime(log_module['date'].decode('ascii'), '%m-%d-%y')
self.enddate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
# There is a timestamp at either 465 or 469 bytes

View File

@@ -371,27 +371,28 @@ def mdb_get_data_text(s3db, filename, table):
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n',
re.IGNORECASE
)
# TODO after dropping Python 2 support - use Popen as contextmanager
try:
mdb_sql = sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
bufsize=-1, stdin=None, stdout=sp.PIPE,
universal_newlines=True)
# Initialize values to avoid NameError in except clause
mdb_output = ''
insert_match = None
with sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
mdb_output = mdb_sql.stdout.read()
while len(mdb_output) > 0:
insert_match = insert_pattern.match(mdb_output)
s3db.execute(insert_match.group())
mdb_output = mdb_output[insert_match.end():]
mdb_output += mdb_sql.stdout.read()
s3db.commit()
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
else:
raise
try:
# Initialize values to avoid NameError in except clause
mdb_output = ''
insert_match = None
mdb_output = mdb_sql.stdout.read()
while len(mdb_output) > 0:
insert_match = insert_pattern.match(mdb_output)
s3db.execute(insert_match.group())
mdb_output = mdb_output[insert_match.end():]
s3db.commit()
except BaseException:
print("Error while importing %s" % table)
if mdb_output:
@@ -399,38 +400,32 @@ def mdb_get_data_text(s3db, filename, table):
if insert_match:
print("insert_re match:", insert_match)
raise
finally:
mdb_sql.terminate()
def mdb_get_data_numeric(s3db, filename, table):
print("Reading %s..." % table)
# TODO after dropping Python 2 support - use Popen as contextmanager
try:
mdb_sql = sp.Popen(['mdb-export', filename, table],
bufsize=-1, stdin=None, stdout=sp.PIPE,
universal_newlines=True)
with sp.Popen(['mdb-export', filename, table],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv)
quoted_headers = ['"%s"' % h for h in mdb_headers]
joined_headers = ', '.join(quoted_headers)
joined_placemarks = ', '.join(['?' for h in mdb_headers])
insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(
table,
joined_headers,
joined_placemarks,
)
s3db.executemany(insert_stmt, mdb_csv)
s3db.commit()
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
else:
raise
try:
mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv)
quoted_headers = ['"%s"' % h for h in mdb_headers]
joined_headers = ', '.join(quoted_headers)
joined_placemarks = ', '.join(['?' for h in mdb_headers])
insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(
table,
joined_headers,
joined_placemarks,
)
s3db.executemany(insert_stmt, mdb_csv)
s3db.commit()
finally:
mdb_sql.terminate()
def mdb_get_data(s3db, filename, table):