Base Model für CITT erstellt, PTM Dortmund ergänzt, Tests hinzugefügt

This commit is contained in:
Markus Clauß
2023-02-28 13:56:11 +01:00
parent b248a7e9b1
commit e861dbf10e
17 changed files with 917 additions and 103 deletions

View File

@@ -1,90 +1,404 @@
# coding: utf-8
import io
import logging
import numpy as np
import pandas as pd
from paveit.analysis import fit_cos
from paveit.functions import calc_nu
from paveit.helper import calc_hash_of_bytes, get_minio_client_processing
from worker import app, logger
class DataSineLoad():
"""
Base class for lab tests with sine load
"""
def __init__(self, filename:str , metadata: dict):
def __init__(self,
filename: str,
metadata: dict,
archive: bool = True,
debug: bool = False,
data: None | io.BytesIO = None):
self.filename = filename
self.metadata = metadata
if isinstance(data, io.BytesIO):
self.data = data
self.archive_data = archive
self.debug = debug
self._logger = logging.getLogger(__name__)
self._logger.info(
f'filename s3: {self.filename}, metadata: {self.metadata}')
self._pre_run()
def _set_parameter(self):
self.split_data_based_on_parameter = ['T', 'sigma', 'f']
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_1', 's_hor_2']
self.val_col_names = ['time', 'T', 'f', 'sigma', 'N', 'F', 's_hor_1', 's_hor_2']
self._logger = logger
self._logger.info(f'filename s3: {self.filename}, metadata: {self.metadata}')
self.columns_analyse = ['F','s_hor_sum','s_hor_1','s_hor_2','s_piston']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height', 'speciment_diameter']
self.number_of_load_cycles_for_analysis = 5
self.meta_names_of_parameter = {
'sigma': ['Max. Spannung']
} #list of names
self.data_column_names = {
'time': ['Time Series'],
'F': ['Load Series'],
's_hor_1': ['LVDT1 Series'],
's_hor_2': ['LVDT2 Series'],
}
def update_parameter():
""" update standard prameter from function self._set_parameter()"""
pass
def _define_units(self):
self.unit_s = 1 #mm
self.unit_F = 1 #N
self.unit_t = 1 / 1000. #s
def _connect_to_s3(self):
self._logger.info('connect to db')
self.__minioClient = get_minio_client_processing()
def _read_from_s3_to_bytesio(self):
self._logger.info('read bytes')
try:
self._connect_to_s3()
response = self.__minioClient.get_object('processing', self.filename)
self.data = response.data
response = self.__minioClient.get_object('processing',
self.filename)
self.data = response.data
finally:
response.close()
response.release_conn()
self.data = io.BytesIO(self.data)
def _calc_hash_of_bytesio(self):
self.filehash = calc_hash_of_bytes(self.data)
self.data.seek(0)
self._logger.debug(f'Hash of file: {self.filehash}')
def _process_data(self):
""" convert self.data (BytesIO) to pandas.DataFrame, update
self.metadata with informations from file """
self._logger.debug('convert bytes to pandas.DataFrame')
encoding = 'utf-8'
self.data = pd.read_csv(self.data, encoding=encoding)
def _standardize_data(self):
colnames = list(self.data.columns)
for par, names in self.data_column_names.items():
for name in names:
colnames = [sub.replace(name, par) for sub in colnames]
self.data.columns = colnames
print(self.data.head(5))
def _bytes_to_df(self):
self._logger.debug('convert bytes to pandas.DataFrame')
encoding='utf-8'
self.df = pd.read_csv(self.data, encoding=encoding)
def _standardize_meta(self):
for par, names in self.meta_names_of_parameter.items():
for name in names:
if name in self.metadata:
self.metadata[par] = self.metadata[name]
self.metadata.pop(name)
break
def _validate_data(self):
def _calc(self):
self._logger.debug('calc data')
return self.df.mean().mean()
for name in self.val_col_names:
if not name in self.data.columns:
raise
def _validate_meta(self):
for name in self.val_header_names:
if not name in self.metadata:
raise
def _post_apply_units(self):
for col in ['s_hor_sum', 's_hor_1', 's_hor_2']:
if col in self.data.columns:
self.data[col] = self.data[col].mul(self.unit_s)
for col in ['F']:
self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
return True
def _post_select_importent_columns(self):
# TODO: add more columns, check datamodel
self.data = self.data[self.val_col_names]
def _post_calc_missiong_values(self):
cols = self.data.columns
if not 's_hor_sum' in cols:
self.data['s_hor_sum'] = self.data[['s_hor_1',
's_hor_2']].sum(axis=1)
def _post_opt_data(self):
#set dtypes:
for col in self.col_as_int:
self.data[col] = self.data[col].astype('int')
for col in self.col_as_float:
try:
self.data[col] = self.data[col].astype('float')
except:
pass
#set index
self.data = self.data.set_index('time')
return True
def _fit_split_data(self):
data_gp = self.data.groupby(self.split_data_based_on_parameter)
data_list = []
for idx, d in data_gp:
idx_diff = np.diff(d.index)
dt_mean = idx_diff.mean()
gaps = idx_diff > (4 * dt_mean)
has_gaps = any(gaps)
if has_gaps == False:
data_list.append(d)
else:
#FIX: GAP FINDING
data_list.append(d)
"""
print('has gaps')
print(gaps)
idx_gaps = (np.where(gaps)[0] - 1)[0]
print(idx_gaps)
data_list.append(d.iloc[0:idx_gaps])
"""
#add self.
if len(data_list) == 0:
self.num_tests = 0
self.data = data_list[0]
else:
self.num_tests = len(data_list)
self.data = data_list
#break
def _fit_select_data(self):
"""
select N load cycles from original data
(a): Based on window of TP-Asphalt
(b) last N cycles
"""
def sel_df(df, num=5):
N = df['N'].unique()
freq = float(df['f'].unique()[0])
# define cycles to select
if freq == 10.0:
Nfrom = 98
Nto = 103
elif freq == 5.0:
Nfrom = 93
Nto = 97
elif freq == 3.0:
Nfrom = 43
Nto = 47
elif freq == 1.0:
Nfrom = 13
Nto = 17
elif freq == 0.3:
Nfrom = 8
Nto = 12
elif freq == 0.1:
Nfrom = 3
Nto = 7
else:
Nfrom = None
Nto = None
# Fall 1: nicht alle LW in Datei
if (max(N) < Nto) & (len(N) >= num):
df_sel = df[(df['N'] >= N[-num]) & (df['N'] <= N[-1])]
# Fall 2:
else:
if Nfrom != None:
if len(N) > Nto - Nfrom:
df_sel = df[(df['N'] >= Nfrom) & (df['N'] <= Nto)]
return df_sel
if not isinstance(self.data, list):
if self.number_of_load_cycles_for_analysis > 1:
df_sel = [
sel_df(self.data,
num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = [self.data]
else:
df_sel = []
for d in self.data:
if self.number_of_load_cycles_for_analysis > 1:
d_sel = sel_df(d,num=self.number_of_load_cycles_for_analysis)
else:
d_sel = d
df_sel.append(d_sel)
# replace data
self.data = df_sel
def _calc(self):
print(len(self.data))
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
if len(data) < 10: continue
data.index = data.index - data.index[0]
res_temp = {}
x = data.index.values
freq = np.round(float(data['f'].unique()),2)
sigma = float(data['sigma'].unique())
temperature = float(data['T'].unique())
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
y = data[col].values
res = fit_cos(x,y, freq=freq)
for key, value in res.items():
res_temp[f'fit_{col}_{key}'] = value
res_temp[f'fit_{col}_max'] = max(y)
res_temp[f'fit_{col}_min'] = min(y)
res_temp['f'] = freq
res_temp['sigma'] = sigma
res_temp['T'] = temperature
## Stiffness
deltaF = res_temp['fit_F_amp']
nu = calc_nu(temperature)
res_temp['nu'] = nu
h = float(self.metadata['speciment_height'])
deltaU = res_temp['fit_s_hor_sum_amp']
res_temp['E'] = (deltaF * (0.274 + nu)) / (h * deltaU)
self.fit.append(res_temp)
self.fit = pd.DataFrame.from_records(self.fit)
self.fit = self.fit.set_index(['T', 'f', 'sigma'])
print(self.fit)
def _archive_binary_data(self):
self._logger.debug('send file to archive')
app.send_task('ArchiveFile', args=[self.filename,
self.metadata,
self.filehash,
'org',
'citt'
],
queue='archive'
)
self._logger.debug('send file to archive')
app.send_task(
'ArchiveFile',
args=[self.filename, self.metadata, self.filehash, 'org', 'citt'],
queue='archive')
def _pre_run(self):
if not hasattr(self, 'data'):
self._read_from_s3_to_bytesio()
self._calc_hash_of_bytesio()
self._set_parameter()
self.update_parameter()
self._define_units()
def run(self):
self._logger.info('run task')
self._read_from_s3_to_bytesio()
self._calc_hash_of_bytesio()
self._bytes_to_df()
res = self._calc()
self._logger.debug(f'results: {res}')
self._archive_binary_data()
return res
self._process_data()
self._standardize_data()
self._standardize_meta()
self._validate_data()
self._validate_meta()
self._post_select_importent_columns()
self._post_apply_units()
self._post_calc_missiong_values()
self._post_opt_data()
self._fit_split_data()
self._fit_select_data()
self._calc()
#self._logger.debug(f'results: {res}')
#if self.archive_data:
# self._archive_binary_data()
#return res