Created function to process the list of column types and return a numpy dtype

This commit is contained in:
Chris Kerr
2013-12-10 09:44:36 +00:00
parent 509b75a81f
commit 6a63196a67

View File

@@ -8,6 +8,7 @@ import csv
from os import SEEK_SET, SEEK_CUR
import time
from datetime import date
from collections import OrderedDict
import numpy as np
@@ -129,19 +130,28 @@ VMPmodule_hdr = np.dtype([('shortname', 'S10'),
('length', '<u4'),
('version', '<u4'),
('date', 'S8')])
VMPdata_dtype = np.dtype([('flags', 'u1'),
("time/s", '<f8'),
("control/V/mA", '<f4'),
("Ewe/V", '<f4'),
("dQ/mA.h", '<f8'),
("P/W", '<f4')])
VMPdata_dtype_extra = np.dtype([('flags', '<u2'),
('blank', 'a1'),
("time/s", '<f8'),
("control/V/mA", '<f4'),
("Ewe/V", '<f4'),
("dQ/mA.h", '<f8'),
("P/W", '<f4')])
def VMPdata_dtype_from_colIDs(colIDs):
dtype_dict = OrderedDict()
for colID in colIDs:
if colID in (1, 2, 3, 21, 31, 65):
dtype_dict['flags'] = 'u1'
elif colID in (131,):
dtype_dict['flags2'] = '<u2'
elif colID == 4:
dtype_dict['time/s'] = '<f8'
elif colID == 5:
dtype_dict['control/V/mA'] = '<f4'
elif colID == 6:
dtype_dict['Ewe/V'] = '<f4'
elif colID == 7:
dtype_dict['dQ/mA.h'] = '<f8'
elif colID == 70:
dtype_dict['P/W'] = '<f4'
else:
raise NotImplementedError("column type %d not implemented" % colID)
return np.dtype(list(dtype_dict.items()))
def read_VMP_modules(fileobj, read_module_data=True):
@@ -210,59 +220,43 @@ class MPRfile:
self.modules = modules
settings_mod, = (m for m in modules if m['shortname'] == b'VMP Set ')
data_module, = (m for m in modules if m['shortname'] == b'VMP data ')
log_module, = (m for m in modules if m['shortname'] == b'VMP LOG ')
maybe_log_module = [m for m in modules if m['shortname'] == b'VMP LOG ']
n_data_points = np.fromstring(data_module['data'][:4], dtype='<u4')
## not actually sure if this is the number of columns or if it means
## something else:
n_columns = int(data_module['data'][4])
if data_module['version'] == 0:
## There is 100 bytes of data before the main array starts
assert(n_columns == 11)
column_types = np.fromstring(data_module['data'][5:], dtype='u1',
count=n_columns)
np.testing.assert_array_equal(column_types,
[1, 2, 3, 21, 31, 65,
4, 5, 6, 7, 70])
assert(data_module['length'] - 100 ==
VMPdata_dtype.itemsize * n_data_points)
self.data = np.frombuffer(data_module['data'],
dtype=VMPdata_dtype,
offset=100)
remaining_headers = data_module['data'][5 + n_columns:100]
main_data = data_module['data'][100:]
elif data_module['version'] == 2:
## There is 405 bytes of data before the main array starts
column_types = np.fromstring(data_module['data'][5:], dtype='<u2',
count=n_columns)
if n_columns == 11:
np.testing.assert_array_equal(column_types,
[1, 2, 3, 21, 31, 65,
4, 5, 6, 7, 70])
assert(data_module['length'] - 405 ==
VMPdata_dtype.itemsize * n_data_points)
self.data = np.frombuffer(data_module['data'],
dtype=VMPdata_dtype,
offset=405)
elif n_columns == 12:
np.testing.assert_array_equal(column_types,
[1, 2, 3, 21, 31, 65,
131, 4, 5, 6, 7, 70])
assert(data_module['length'] - 405 ==
VMPdata_dtype_extra.itemsize * n_data_points)
self.data = np.frombuffer(data_module['data'],
dtype=VMPdata_dtype_extra,
offset=405)
assert(np.all(self.data['blank'] == b'\x00'))
else:
raise ValueError("Cannot deal with n_columns = %d" % n_columns)
## There is 405 bytes of data before the main array starts
remaining_headers = data_module['data'][5 + 2 * n_columns:405]
main_data = data_module['data'][405:]
else:
raise ValueError("Unrecognised version for data module: %d" %
data_module['version'])
assert(not any(remaining_headers))
self.dtype = VMPdata_dtype_from_colIDs(column_types)
self.data = np.fromstring(main_data, dtype=self.dtype)
assert(self.data.shape[0] == n_data_points)
## No idea what these 'column types' mean or even if they are actually
## column types at all
self.version = int(data_module['version'])
self.cols = column_types
self.npts = n_data_points
tm = time.strptime(str(settings_mod['date'], encoding='ascii'),
'%m/%d/%y')
self.startdate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)
if maybe_log_module:
log_module, = maybe_log_module
tm = time.strptime(str(log_module['date'], encoding='ascii'),
'%m/%d/%y')
self.enddate = date(tm.tm_year, tm.tm_mon, tm.tm_mday)