Files
lib-paveit-demo/src/paveit/labtest/base.py
2023-06-30 14:12:37 +02:00

426 lines
11 KiB
Python
Executable File

# coding: utf-8
import io
import logging
import numpy as np
import pandas as pd
from paveit.analysis import fit_cos
from paveit.functions import calc_nu
from paveit.helper import calc_hash_of_bytes, get_minio_client_processing
class DataSineLoad():
"""
Base class for lab tests with sine load
"""
def __init__(self,
filename: str,
metadata: dict,
logger=None,
debug: bool = False,
data: None | io.BytesIO = None):
self.filename = filename
self.metadata = metadata
if isinstance(data, io.BytesIO):
self.data = data
self.debug = debug
if logger == None:
self._logger = logging.getLogger(__name__)
else:
self._logger = logger
self._logger.info(
f'filename s3: {self.filename}, metadata: {self.metadata}')
self._pre_run()
def _set_parameter(self):
self._logger.debug('run _set_parameter')
self.split_data_based_on_parameter = ['T', 'sigma', 'f']
self.col_as_int = ['N']
self.col_as_float = [
'T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_1', 's_hor_2'
]
self.val_col_names = [
'time', 'T', 'f', 'sigma', 'N', 'F', 's_hor_1', 's_hor_2', 's_piston'
]
self.columns_analyse = [
'F', 's_hor_sum', 's_hor_1', 's_hor_2', 's_piston'
]
self.round_values = [('T', 3)]
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height', 'speciment_diameter']
self.number_of_load_cycles_for_analysis = 5
self.meta_names_of_parameter = {
'sigma': ['Max. Spannung']
} #list of names
self.data_column_names = {
'time': ['Time Series'],
'F': ['Load Series'],
's_hor_1': ['LVDT1 Series'],
's_hor_2': ['LVDT2 Series'],
}
def update_parameter():
""" update standard prameter from function self._set_parameter()"""
pass
def _define_units(self):
self.unit_s = 1 #mm
self.unit_F = 1 #N
self.unit_t = 1 / 1000. #s
def _connect_to_s3(self):
self._logger.debug('run _connect to db')
self.__minioClient = get_minio_client_processing()
def _read_from_s3_to_bytesio(self):
self._logger.debug('run _read bytes')
try:
self._connect_to_s3()
response = self.__minioClient.get_object('processing',
self.filename)
self.data = response.data
finally:
response.close()
response.release_conn()
self.data = io.BytesIO(self.data)
self._logger.debug('read data from s3')
def _calc_hash_of_bytesio(self):
self._logger.debug('run _calc_hash_of_bytesio')
self.filehash = calc_hash_of_bytes(self.data)
self.data.seek(0)
self._logger.debug(f'Hash of file: {self.filehash}')
def _define_data_models(self):
pass
def _data_in_db(self):
nsamples = self._datamodel.objects(filehash = self.filehash).count()
if nsamples>0:
self.file_in_db = True
else:
self.file_in_db = False
def _process_data(self):
""" convert self.data (BytesIO) to pandas.DataFrame, update
self.metadata with informations from file """
self._logger.debug('convert bytes to pandas.DataFrame')
encoding = 'utf-8'
self.data = pd.read_csv(self.data, encoding=encoding)
def _meta_to_float(self):
for key, d in self.metadata.items():
try:
#remove units
for unit in ["°C", 'Hz']:
if unit in d:
d = d.split(unit)[0].strip()
f = float(d.replace(',', '.'))
self.metadata[key] = f
except:
pass
def _standardize_data(self):
self._logger.debug('run _standardize_data')
colnames = list(self.data.columns)
for par, names in self.data_column_names.items():
for name in names:
colnames = [sub.replace(name, par) for sub in colnames]
self.data.columns = colnames
self._logger.debug(f'columns: {colnames}')
print(self.data.head())
self._logger.debug(f'standardize_data: {self.data.columns}')
def _standardize_meta(self):
self._logger.debug('run _standardize_meta')
# remove "\r\n" ending from Windows and whitespace
for col in list(self.metadata.keys()):
col_mod = col.replace('\r\n', '')
col_mod = col_mod.strip()
if col != col_mod:
self.metadata[col_mod] = self.metadata[col]
self.metadata.pop(col)
for par, names in self.meta_names_of_parameter.items():
for name in names:
if name in self.metadata:
self.metadata[par] = self.metadata[name]
self.metadata.pop(name)
break
# stip data
for key in self.metadata.keys():
try:
self.metadata[key] = self.metadata[key].strip()
except:
pass
self._logger.debug(f'meta (stand.): {self.metadata}')
def _modify_meta(self):
pass
def _validate_data(self):
self._logger.debug('run _validate_data')
for name in self.val_col_names:
if not name in self.data.columns:
# check if value in metadata:
if name in self.metadata.keys():
self._logger.error(f'add {name} from metadata to data')
self.data[name] = self.metadata[name]
else:
self._logger.error(f'{name} not in data')
raise
self._logger.debug(f'validate_data: {self.data.columns}')
def _validate_meta(self):
self._logger.debug('run _validate_meta')
for name in self.val_header_names:
if not name in self.metadata:
self._logger.error(f'{name} not found')
raise
def _post_string_to_float(self):
sel = self.data.select_dtypes(include=['object'])
if sel.empty:
return
for col in sel.columns:
try:
self.data[col] = pd.to_numeric(self.data[col].str.replace(
',', '.'))
except:
pass
def _post_apply_units(self):
for col in [
's_hor_sum', 's_hor_1', 's_hor_2', 's_vert_sum', 's_vert_1',
's_vert_2', 's_piston', 'extension',
]:
if col in self.data.columns:
self.data[col] = self.data[col].mul(self.unit_s)
for col in ['F']:
self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
try:
self.data['f'] = self.data['f'].mul(self.unit_freq)
except:
pass
return True
def _post_round_values(self):
for par, digits in self.round_values:
if par in self.data.columns:
self.data[par] = self.data[par].round(digits)
def _post_select_importent_columns(self):
# TODO: add more columns, check datamodel
self.data = self.data[self.val_col_names]
def _post_calc_missiong_values(self):
cols = self.data.columns
if not 's_hor_sum' in cols:
if ('s_hor_1' in self.data.columns) & ('s_hor_2'
in self.data.columns):
self.data['s_hor_sum'] = self.data[['s_hor_1',
's_hor_2']].sum(axis=1)
if not 's_vert_sum' in cols:
if ('s_vert_1' in self.data.columns) & ('s_vert_2'
in self.data.columns):
self.data['s_vert_sum'] = self.data[['s_vert_1',
's_vert_2']].sum(axis=1)
def _post_opt_data(self):
#set dtypes:
for col in self.col_as_int:
self.data[col] = self.data[col].astype('int')
for col in self.col_as_float:
try:
self.data[col] = self.data[col].astype('float')
except:
pass
#set index
self.data = self.data.set_index('time')
return True
def _fit_split_data(self):
self._logger.debug('run _fit_split_data')
data_gp = self.data.groupby(self.split_data_based_on_parameter)
data_list = []
for idx, d in data_gp:
if d.empty: continue
if any(d['f'] <= 0.0): continue
#reset N
d['N'] = d['N'] - d['N'].iloc[0] + 1
idx_diff = np.diff(d.index)
dt_mean = idx_diff.mean()
gaps = idx_diff > (4 * dt_mean)
has_gaps = any(gaps)
if has_gaps == False:
data_list.append(d)
else:
#FIX: GAP FINDING
data_list.append(d)
"""
print('has gaps')
print(gaps)
idx_gaps = (np.where(gaps)[0] - 1)[0]
print(idx_gaps)
data_list.append(d.iloc[0:idx_gaps])
"""
#add self.
if len(data_list) == 0:
self.num_tests = 0
self.data = data_list[0]
else:
self.num_tests = len(data_list)
self.data = data_list
#break
nchunks = len(self.data)
self._logger.debug(f'data splited in {nchunks} chunks')
def _fit_select_data(self):
"""
select N load cycles from original data
(a): Based on window of TP-Asphalt
(b) last N cycles
DUMMY FUNCTION
"""
pass
def _calc(self):
"""
Calculate Results
DUMMY FUNCTION
"""
self._logger.info('run _calc base')
print('run BASE')
def save(self):
'''
save results to database
DUMMY FUNCTION
'''
pass
def _pre_run(self):
if not hasattr(self, 'data'):
self._read_from_s3_to_bytesio()
self._calc_hash_of_bytesio()
self._define_data_models()
#self._data_in_db()
self._set_parameter()
self.update_parameter()
self._define_units()
def run(self):
self._logger.info('run task')
self._process_data()
self._meta_to_float()
self._standardize_meta()
self._standardize_data()
self._modify_meta()
self._validate_meta()
self._validate_data()
self._post_string_to_float()
self._post_select_importent_columns()
self._post_apply_units()
self._post_round_values()
self._post_calc_missiong_values()
self._post_opt_data()
self._fit_split_data()
self._fit_select_data()
self._calc()
#self._logger.info(f'results: {self.fit['E']}')