Reformatted res2sqlite.py with black 23.12.1

This commit is contained in:
2024-01-20 21:57:14 +02:00
parent dee8af3a86
commit 239db97c69

View File

@@ -16,43 +16,43 @@ from copy import copy
# $ mdb-schema <result.res> oracle
mdb_tables = [
'Version_Table',
'Global_Table',
'Resume_Table',
'Channel_Normal_Table',
'Channel_Statistic_Table',
'Auxiliary_Table',
'Event_Table',
'Smart_Battery_Info_Table',
'Smart_Battery_Data_Table',
"Version_Table",
"Global_Table",
"Resume_Table",
"Channel_Normal_Table",
"Channel_Statistic_Table",
"Auxiliary_Table",
"Event_Table",
"Smart_Battery_Info_Table",
"Smart_Battery_Data_Table",
]
mdb_5_23_tables = [
'MCell_Aci_Data_Table',
'Aux_Global_Data_Table',
'Smart_Battery_Clock_Stretch_Table',
"MCell_Aci_Data_Table",
"Aux_Global_Data_Table",
"Smart_Battery_Clock_Stretch_Table",
]
mdb_5_26_tables = [
'Can_BMS_Info_Table',
'Can_BMS_Data_Table',
"Can_BMS_Info_Table",
"Can_BMS_Data_Table",
]
mdb_tables_text = {
'Version_Table',
'Global_Table',
'Event_Table',
'Smart_Battery_Info_Table',
'Can_BMS_Info_Table',
"Version_Table",
"Global_Table",
"Event_Table",
"Smart_Battery_Info_Table",
"Can_BMS_Info_Table",
}
mdb_tables_numeric = {
'Resume_Table',
'Channel_Normal_Table',
'Channel_Statistic_Table',
'Auxiliary_Table',
'Smart_Battery_Data_Table',
'MCell_Aci_Data_Table',
'Aux_Global_Data_Table',
'Smart_Battery_Clock_Stretch_Table',
'Can_BMS_Data_Table',
"Resume_Table",
"Channel_Normal_Table",
"Channel_Statistic_Table",
"Auxiliary_Table",
"Smart_Battery_Data_Table",
"MCell_Aci_Data_Table",
"Aux_Global_Data_Table",
"Smart_Battery_Clock_Stretch_Table",
"Can_BMS_Data_Table",
}
mdb_create_scripts = {
@@ -271,7 +271,7 @@ CREATE TABLE Smart_Battery_Data_Table
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
); """,
# The following tables are not present in version 1.14, but are in 5.23
'MCell_Aci_Data_Table': """
"MCell_Aci_Data_Table": """
CREATE TABLE MCell_Aci_Data_Table
(
Test_ID INTEGER,
@@ -285,7 +285,7 @@ CREATE TABLE MCell_Aci_Data_Table
FOREIGN KEY (Test_ID, Data_Point)
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);""",
'Aux_Global_Data_Table': """
"Aux_Global_Data_Table": """
CREATE TABLE Aux_Global_Data_Table
(
Channel_Index INTEGER,
@@ -295,7 +295,7 @@ CREATE TABLE Aux_Global_Data_Table
Unit TEXT,
PRIMARY KEY (Channel_Index, Auxiliary_Index, Data_Type)
);""",
'Smart_Battery_Clock_Stretch_Table': """
"Smart_Battery_Clock_Stretch_Table": """
CREATE TABLE Smart_Battery_Clock_Stretch_Table
(
Test_ID INTEGER,
@@ -344,7 +344,7 @@ CREATE TABLE Smart_Battery_Clock_Stretch_Table
REFERENCES Channel_Normal_Table (Test_ID, Data_Point)
);""",
# The following tables are not present in version 5.23, but are in 5.26
'Can_BMS_Info_Table': """
"Can_BMS_Info_Table": """
CREATE TABLE "Can_BMS_Info_Table"
(
Channel_Index INTEGER PRIMARY KEY,
@@ -352,7 +352,7 @@ CREATE TABLE "Can_BMS_Info_Table"
CAN_Configuration TEXT
);
""",
'Can_BMS_Data_Table': """
"Can_BMS_Data_Table": """
CREATE TABLE "Can_BMS_Data_Table"
(
Test_ID INTEGER,
@@ -371,7 +371,8 @@ mdb_create_indices = {
CREATE UNIQUE INDEX data_point_index ON Channel_Normal_Table (Test_ID, Data_Point);
CREATE INDEX voltage_index ON Channel_Normal_Table (Test_ID, Voltage);
CREATE INDEX test_time_index ON Channel_Normal_Table (Test_ID, Test_Time);
"""}
"""
}
helper_table_script = """
CREATE TEMPORARY TABLE capacity_helper(
@@ -438,29 +439,33 @@ CREATE VIEW IF NOT EXISTS Capacity_View
def mdb_get_data_text(s3db, filename, table):
print("Reading %s..." % table)
insert_pattern = re.compile(
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n',
re.IGNORECASE
r'INSERT INTO "\w+" \([^)]+?\) VALUES \(("[^"]*"|[^")])+?\);\n', re.IGNORECASE
)
try:
# Initialize values to avoid NameError in except clause
mdb_output = ''
mdb_output = ""
insert_match = None
with sp.Popen(['mdb-export', '-I', 'postgres', filename, table],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
with sp.Popen(
["mdb-export", "-I", "postgres", filename, table],
bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_output = mdb_sql.stdout.read()
while len(mdb_output) > 0:
insert_match = insert_pattern.match(mdb_output)
s3db.execute(insert_match.group())
mdb_output = mdb_output[insert_match.end():]
mdb_output = mdb_output[insert_match.end() :]
mdb_output += mdb_sql.stdout.read()
s3db.commit()
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
raise RuntimeError(
"Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else:
raise
except BaseException:
@@ -475,14 +480,18 @@ def mdb_get_data_text(s3db, filename, table):
def mdb_get_data_numeric(s3db, filename, table):
print("Reading %s..." % table)
try:
with sp.Popen(['mdb-export', filename, table],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
with sp.Popen(
["mdb-export", filename, table],
bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv)
quoted_headers = ['"%s"' % h for h in mdb_headers]
joined_headers = ', '.join(quoted_headers)
joined_placemarks = ', '.join(['?' for h in mdb_headers])
joined_headers = ", ".join(quoted_headers)
joined_placemarks = ", ".join(["?" for h in mdb_headers])
insert_stmt = 'INSERT INTO "{0}" ({1}) VALUES ({2});'.format(
table,
joined_headers,
@@ -492,8 +501,10 @@ def mdb_get_data_numeric(s3db, filename, table):
s3db.commit()
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
raise RuntimeError(
"Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else:
raise
@@ -504,7 +515,9 @@ def mdb_get_data(s3db, filename, table):
elif table in mdb_tables_numeric:
mdb_get_data_numeric(s3db, filename, table)
else:
raise ValueError("'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table)
raise ValueError(
"'%s' is in neither mdb_tables_text nor mdb_tables_numeric" % table
)
def mdb_get_version(filename):
@@ -514,9 +527,13 @@ def mdb_get_version(filename):
"""
print("Reading version number...")
try:
with sp.Popen(['mdb-export', filename, 'Version_Table'],
bufsize=-1, stdin=sp.DEVNULL, stdout=sp.PIPE,
universal_newlines=True) as mdb_sql:
with sp.Popen(
["mdb-export", filename, "Version_Table"],
bufsize=-1,
stdin=sp.DEVNULL,
stdout=sp.PIPE,
universal_newlines=True,
) as mdb_sql:
mdb_csv = csv.reader(mdb_sql.stdout)
mdb_headers = next(mdb_csv)
mdb_values = next(mdb_csv)
@@ -525,23 +542,31 @@ def mdb_get_version(filename):
except StopIteration:
pass
else:
raise ValueError('Version_Table of %s lists multiple versions' % filename)
raise ValueError(
"Version_Table of %s lists multiple versions" % filename
)
except OSError as e:
if e.errno == 2:
raise RuntimeError('Could not locate the `mdb-export` executable. '
'Check that mdbtools is properly installed.')
raise RuntimeError(
"Could not locate the `mdb-export` executable. "
"Check that mdbtools is properly installed."
)
else:
raise
if 'Version_Schema_Field' not in mdb_headers:
raise ValueError('Version_Table of %s does not contain a Version_Schema_Field column'
% filename)
if "Version_Schema_Field" not in mdb_headers:
raise ValueError(
"Version_Table of %s does not contain a Version_Schema_Field column"
% filename
)
version_fields = dict(zip(mdb_headers, mdb_values))
version_text = version_fields['Version_Schema_Field']
version_match = re.fullmatch('Results File ([.0-9]+)', version_text)
version_text = version_fields["Version_Schema_Field"]
version_match = re.fullmatch("Results File ([.0-9]+)", version_text)
if not version_match:
raise ValueError('File version "%s" did not match expected format' % version_text)
raise ValueError(
'File version "%s" did not match expected format' % version_text
)
version_string = version_match.group(1)
version_tuple = tuple(map(int, version_string.split('.')))
version_tuple = tuple(map(int, version_string.split(".")))
return version_tuple
@@ -581,12 +606,14 @@ def main(argv=None):
parser = argparse.ArgumentParser(
description="Convert Arbin .res files to sqlite3 databases using mdb-export",
)
parser.add_argument('input_file', type=str) # need file name to pass to sp.Popen
parser.add_argument('output_file', type=str) # need file name to pass to sqlite3.connect
parser.add_argument("input_file", type=str) # need file name to pass to sp.Popen
parser.add_argument(
"output_file", type=str
) # need file name to pass to sqlite3.connect
args = parser.parse_args(argv)
convert_arbin_to_sqlite(args.input_file, args.output_file)
if __name__ == '__main__':
if __name__ == "__main__":
main()