Files
lib-paveit-demo/src/paveit/labtest/citt.py

1054 lines
29 KiB
Python
Executable File

import io
import os
from csv import reader
import numpy as np
import pandas as pd
from bson import ObjectId
from paveit import calc_nu, fit_cos
from paveit.datamodels import CITTSiffness, CITTSiffnessResults
from paveit.io import read_geosys
from paveit.labtest import DataSineLoad
class CITTBase(DataSineLoad):
def _set_parameter(self):
self._logger.debug('run _set_parameter')
self.split_data_based_on_parameter = ['T', 'sigma', 'f']
self.col_as_int = ['N']
self.col_as_float = [
'T', 'F', 's_piston', 's_hor_1', 'f', 's_hor_1', 's_hor_2'
]
self.val_col_names = [
'time', 'T', 'f', 'sigma', 'N', 'F', 's_hor_1', 's_hor_2', 's_piston'
]
self.columns_analyse = [
'F', 's_hor_sum', 's_hor_1', 's_hor_2', 's_piston'
]
self.round_values = [('T', 3), ('sigma', 3)]
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height', 'speciment_diameter']
self.number_of_load_cycles_for_analysis = 5
#Dummy Data, replace in Machine Config
self.meta_names_of_parameter = {
'sigma': ['Max. Spannung']
} #list of names
#Dummy Data, replace in Machine Config
self.data_column_names = {
'time': ['Time Series'],
'F': ['Load Series'],
's_hor_1': ['LVDT1 Series'],
's_hor_2': ['LVDT2 Series'],
}
def _sel_df(self, df, num=5, shift=-1):
N = df['N'].unique()
n_N = len(N)
max_N = max(N)
min_N = min(N)
freq = float(df['f'].unique()[0])
# define cycles to select
if freq == 10.0:
Nfrom = 98
Nto = 103
elif freq == 5.0:
Nfrom = 93
Nto = 97
elif freq == 3.0:
Nfrom = 43
Nto = 47
elif freq == 1.0:
Nfrom = 13
Nto = 17
elif freq == 0.3:
Nfrom = 8
Nto = 12
elif freq == 0.1:
Nfrom = 3
Nto = 7
else:
Nfrom = None
Nto = None
# Fall 1: nur num Lastwechsel
if n_N < num:
df_sel = None
elif n_N == num:
df_sel = df
# Fall 2: nicht alle LW in Datei
elif (max_N < Nto) & (n_N > num):
df_sel = df[(df['N'] >= N[-num + shift])
& (df['N'] <= N[-1 + shift])]
# Fall 3: Auswahl wie oben definiert
elif (Nfrom >= min_N) & (Nto < max_N):
df_sel = df[(df['N'] >= Nfrom) & (df['N'] <= Nto)]
# Fall 4: Auswahl unbekannt
else:
df_sel = None
return df_sel
def _fit_select_data(self):
"""
select N load cycles from original data
(a): Based on window of TP-Asphalt
(b) last N cycles
"""
self._logger.debug('run _fit_select_data')
self.max_N_in_data = []
if not isinstance(self.data, list):
if self.number_of_load_cycles_for_analysis > 1:
self.max_N_in_data.append(self.data['N'].max())
df_sel = [
self._sel_df(self.data,
num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = [self.data]
else:
df_sel = []
for d in self.data:
self.max_N_in_data.append(d['N'].max())
if self.number_of_load_cycles_for_analysis > 1:
d_sel = self._sel_df(
d, num=self.number_of_load_cycles_for_analysis)
else:
d_sel = d
df_sel.append(d_sel)
# replace data
self.data = df_sel
def _calc(self):
self._logger.info('run _calc CITT')
print('run CITT')
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
if len(data) < 10: continue
try:
self._logger.debug(f'run fit on subset {idx_data}')
data.index = data.index - data.index[0]
res_temp = {}
res_temp['idx'] = idx_data
x = data.index.values
freq = np.round(float(data['f'].unique()), 2)
sigma = float(data['sigma'].unique())
temperature = float(data['T'].unique())
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
y = data[col].values
res = fit_cos(x, y, freq=freq)
for key, value in res.items():
res_temp[f'fit_{col}_{key}'] = value
# analyse cycle data
cycle_min = []
cycle_max = []
cycle_mean = []
cycle_diff = []
for N, data_cycle in data.groupby('N'):
y = data_cycle[col].values
cycle_min.append(y.min())
cycle_max.append(y.max())
cycle_mean.append(y.mean())
cycle_diff.append(cycle_max[-1] - cycle_min[-1])
res_temp[f'fit_{col}_cycle_min'] = cycle_min
res_temp[f'fit_{col}_min'] = np.mean(cycle_min)
res_temp[f'fit_{col}_min_std'] = np.std(cycle_min)
res_temp[f'fit_{col}_min_diff_rel'] = (np.max(cycle_min) - np.min(cycle_min))/np.mean(cycle_min)
res_temp[f'fit_{col}_cycle_max'] = cycle_max
res_temp[f'fit_{col}_max'] = np.mean(cycle_max)
res_temp[f'fit_{col}_max_std'] = np.std(cycle_max)
res_temp[f'fit_{col}_max_diff_rel'] = (np.max(cycle_max) - np.min(cycle_max))/np.mean(cycle_max)
res_temp[f'fit_{col}_cycle_mean'] = cycle_mean
res_temp[f'fit_{col}_mean'] = np.mean(cycle_mean)
res_temp[f'fit_{col}_mean_std'] = np.std(cycle_mean)
res_temp[f'fit_{col}_mean_diff_rel'] = (np.max(cycle_mean) - np.min(cycle_mean))/np.mean(cycle_mean)
res_temp[f'fit_{col}_cycle_diff'] = cycle_diff
res_temp[f'fit_{col}_diff'] = np.mean(cycle_diff)
res_temp[f'fit_{col}_diff_std'] = np.std(cycle_diff)
res_temp[f'fit_{col}_diff_diff_rel'] = (np.max(cycle_diff) - np.min(cycle_diff))/np.mean(cycle_diff)
# add more metadata
res_temp['f_set'] = freq
res_temp['sigma_set'] = sigma
res_temp['T_set'] = temperature
res_temp['N_from'] = data['N'].min()
res_temp['N_to'] = data['N'].max()
res_temp['N_tot'] = self.max_N_in_data[idx_data]
res_temp['n_samples_per_cycle'] = int(
len(data) / (res_temp['N_to'] - res_temp['N_from'] + 1))
## Stiffness
deltaF = res_temp['fit_F_amp']
nu = calc_nu(temperature)
res_temp['nu'] = nu
h = float(self.metadata['speciment_height'])
deltaU = res_temp['fit_s_hor_sum_amp']
res_temp['stiffness'] = (deltaF * (0.274 + nu)) / (h * deltaU)
# TODO: Überarbeiten und erweitern (ISSUE #2)
res_temp['phase'] = res_temp['fit_F_phase'] - res_temp[
'fit_s_hor_sum_phase']
except Exception as e:
self._logger.exception(e)
res_temp = None
self._logger.debug(res_temp)
self.fit.append(res_temp)
self.fit = pd.DataFrame.from_records(self.fit)
self.fit = self.fit.reset_index(drop=True).set_index('idx')
#self.fit = self.fit.set_index(['T', 'f', 'sigma'])
nsamples = len(self.fit)
self._logger.info(f'fitting finished, add {nsamples} samples')
self._logger.debug(self.fit['stiffness'])
def save(self,
task_id: ObjectId,
meta: dict = {}
):
"""
save results to mongodb
"""
if not hasattr(self, 'fit'):
raise
# precheck data and results
#assert len(self.data) == len(self.fit)
for idx_fit, fit in self.fit.iterrows():
data = self.data[idx_fit]
meta['filehash'] = self.filehash
meta['task_id'] = task_id
#check if result in db
#n = CITTSiffness.objects(**meta).count()
#print(n)
# write data
data_dict = fit.to_dict()
data_dict.update(meta)
# remove 'fit_' from keys:
for key in list(data_dict.keys()):
if key.startswith('fit_'):
data_dict[key[4:]] = data_dict[key]
data_dict.pop(key)
# rename fields
def rename_field(d, old, new):
d[new] = d[old]
d.pop(old)
f = CITTSiffnessResults(**data_dict).save()
# required data
data_out = dict(
time=data.index,
F=list(data['F']),
N=list(data['N']),
s_hor_1=list(data['s_hor_1']),
s_hor_2=list(data['s_hor_2']),
s_hor_sum=list(data['s_hor_sum']),
)
self._logger.debug(f'columns data, {data.columns}')
# add optional datas
for col in ['s_piston']:
if col in data.columns:
self._logger.debug(f'add {col} to output data')
data_out[col] = list(data[col])
outkeys = list(data_out.keys())
self._logger.debug(f'write raw data to db, {outkeys}')
g = CITTSiffness(result=f.id, **data_out).save()
class CITT_TUDresdenWille(CITTBase):
def _define_units(self):
self.unit_s = 1 / 1000. #mm
self.unit_F = 1.0 #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Frequenz'],
'T': ['Prüftemperatur'],
't': ['Zeit'],
'speciment_diameter': ['Probekörberdurchmesser', 'Probekörberbreite'],
'speciment_height': ['Probekörperhöhe'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Kraft'],
'N': ['Zyklenzähler'],
's_hor_1': ['Horizontalweg vorn', 'Weg Probe'],
's_hor_2': ['Horizontalweg hinten', 'Weg 2'],
's_piston': ['Kolbenweg'],
}
def _process_data(self):
meta, data = read_geosys(self.data, '039', metadata_ids=['001','003', '011', '019'])
#define in class
self.data = data.reset_index()
self.metadata.update(meta)
# log infos
self._logger.debug(f'metadata: {self.metadata}')
self._logger.debug(f'data: {self.data.head()}')
print(data.head())
class CITT_TUDresdenTira(CITTBase):
def _define_units(self):
self.unit_s = 1 #mm
self.unit_F = 1. #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Sollfrequnz'],
'T': ['Solltemperatur'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
#'Nto': ['Letzer Aufzeichnungslastwechsel', 'Last Cycle'],
't': ['Zeit'],
'speciment_diameter': ['PK-Durchmesser', 'Probekörberbreite'],
'speciment_height': ['PK-Höhe', 'Probekörperhöhe'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Kraft'],
'N': ['Lastwechsel'],
's_hor_1': ['IWA_1 2 mm'],
's_hor_2': ['IWA_2 2 mm'],
's_piston': ['Kolbenweg'],
}
def _process_data(self):
# -----------------------------------------------------------------------------------------------
# TIRA
# -----------------------------------------------------------------------------------------------
if self._machine == 'TIRA':
encoding = 'latin-1'
skiprows = 29
hasunits = True
splitsign = ':;'
# metadata from file
meta = {}
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
line = line.decode(encoding)
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
key = linesplit[0].strip()
value = linesplit[1].split(';')[0].strip()
meta[key] = value
if count >= skiprows:
break
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
header=None,
skiprows=skiprows,
decimal=',',
thousands='.',
sep=';')
data = data.iloc[2:]
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
line = line.decode(encoding)
count += 1
if count >= skiprows:
break
#clean data
data = data.dropna(axis=1)
# add header from file
head = line.split(';')
data.columns = head
data.columns = [l.strip() for l in data.columns]
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_PTMDortmund(CITTBase):
def _define_units(self):
self.unit_s = 1 #mm
self.unit_F = 1000. #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Max. Spannung', 'Max Stress'],
'f': ['Frequenz', 'Frequency'],
'T': ['Versuchstemperatur', 'Target Test Temperature'],
'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
'Nto': ['Letzer Aufzeichnungslastwechsel', 'Last Cycle'],
't': ['Zeitfolgen', 'Time Series'],
'speciment_diameter': ['Durchmesser (mm)', 'Diameter (mm)'],
'speciment_height': ['Länge (mm)', 'Length (mm)'],
} #list of names
self.data_column_names = {
'time': ['Time Series'],
'F': ['Load Series'],
's_hor_1': ['LVDT1 Series'],
's_hor_2': ['LVDT2 Series'],
}
def _process_data(self):
res = []
xl = pd.ExcelFile(self.data)
num_sheets = len(xl.sheet_names)
diameter = []
height = []
for sheetid in range(num_sheets):
temp = pd.read_excel(self.data, sheetid, skiprows=97)
temp = temp.drop(index=0)
#convert data to numerical data
for col in temp.columns:
temp[col] = pd.to_numeric(temp[col])
#read metadata from file
meta = pd.read_excel(self.data, sheetid, skiprows=1, nrows=80)
meta = meta[meta.columns[[0, 2]]]
meta = meta.set_index(meta.columns[0])
meta = meta.dropna(axis=0)
meta = meta[meta.columns[0]]
meta = meta.to_dict()
#remove whitespace in dict keys:
meta = {
x.strip(): v
for x, v in meta.items() if isinstance(x, str)
}
frequency_test = None
# add metadata to dataframe
for par in ['sigma', 'f', 'T']:
names = self.meta_names_of_parameter[par]
v = None
for name in names:
try:
v = np.round(float(meta[name]), 5)
if par == 'f':
v = np.round(v, 2)
break
except:
pass
assert v is not None
temp[par] = v
if par == 'f':
frequency_test = v
# read additional parameters
names = self.meta_names_of_parameter['Nfrom']
for name in names:
try:
Nfrom = int(meta[name])
break
except:
Nfrom = None
assert Nfrom is not None
names = self.meta_names_of_parameter['Nto']
for name in names:
try:
Nto = int(meta[name])
break
except:
Nto = None
assert Nto is not None
#add cycle number to dataframe
names = self.meta_names_of_parameter['t']
for name in names:
try:
time_idx = temp[name].values
break
except:
time_idx = None
assert time_idx is not None
temp['N'] = 1
dt = 1.0 / frequency_test
tmax = dt
max_timeindex = max(time_idx)
cycle = 0
while tmax < max_timeindex:
# time window
tmin = (cycle) * dt
tmax = (cycle + 1) * dt
#filter data
idx = temp[(time_idx >= tmin) & (time_idx < tmax)].index
#set cycle number, cycle starts with 1
temp.loc[idx, 'N'] = cycle + 1
cycle += 1
# add diameter and height to list
names = self.meta_names_of_parameter['speciment_diameter']
for name in names:
try:
v = float(meta[name])
break
except:
v = None
assert v is not None
diameter.append(v)
names = self.meta_names_of_parameter['speciment_height']
for name in names:
try:
v = float(meta[name])
break
except:
v = None
assert v is not None
height.append(v)
#append data to final dataframe
res.append(temp)
#concat all parts to single dataframe
res = pd.concat(res)
# add data from speciment to metadata
#if not 'speciment_diameter' in self.metadata:
# self.metadata['speciment_diameter'] = np.mean(diameter)
#if not 'speciment_height' in self.metadata:
# self.metadata['speciment_height'] = np.mean(height)
#define in class
self.data = res.reset_index()
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_UniSiegen(CITTBase):
def _define_units(self):
self.unit_s = 1 / 1000. #mm
self.unit_F = 1.0 #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Frequenz'],
'T': ['Prüftemperatur'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
'Nto': ['Maximale Zyklenanzahl'],
't': ['Zeit'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Kraft'],
's_hor_1': ['Radialweg vorn'],
's_hor_2': ['Radialweg hinten'],
's_piston': ['Kolbenweg'],
'N': ['Zyklenzähler'],
}
def _process_data(self):
meta, data = read_geosys(self.data, '015', metadata_ids=['003'])
#define in class
self.data = data.reset_index()
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_BASt(CITTBase):
def _define_units(self):
self.unit_s = 1 / 1000. #mm
self.unit_F = 1.0 #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Versuchsart'],
'T': ['Prüftemperatur\r\n'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
'Nto': ['Maximale Zyklenanzahl'],
't': ['Zeit'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Vertikalkraft'],
's_hor_1': ['Horizontalweg IWA vorn'],
's_hor_2': ['Horizontalweg IWA hinten'],
's_piston': ['Kolbenposition'],
'N': ['Zyklenzähler'],
}
def _process_data(self):
meta, data = read_geosys(self.data,
'047',
metadata_ids=['003', '005', '015'])
#define in class
self.data = data.reset_index()
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
def _modify_meta(self):
s = self.metadata['f']
s = s.split('Hz')[0].split('Steifigkeit')[-1].strip().replace(',', '.')
self.metadata['f'] = float(s)
s = self.metadata['T']
s = s.split('°C')[0].strip().replace(',', '.')
self.metadata['T'] = float(s)
class CITT_LaborHart(CITTBase):
def _define_units(self):
self.unit_s = 1.0 #mm
self.unit_F = 1.0 #N
self.unit_t = 1. / 1000.0 #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'T': ['Solltemperatur'],
't': ['TIME'],
'speciment_diameter': ['Probendurchmesser'],
'speciment_height': ['Probenhöhe'],
} #list of names
self.data_column_names = {
'time': ['TIME'],
'f': ['FREQUENZ'],
'F': ['Load'],
's_hor_1': ['SENSOR 4'],
's_hor_2': ['SENSOR Extension'],
's_piston': ['Position'],
'N': ['Impulsnummer'],
}
def _process_data(self):
meta = {}
splitsign = ':;'
encoding = 'latin-1'
skiprows = 14
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
#remove whitespace
line = line.decode(encoding)
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
if count >= skiprows:
break
line = line.decode(encoding)
head = line.split(';')
data.columns = head
# FIX: Sigma nicht in Metadaten oder Messdaten enthalten
print(meta)
if not "sigma" in self.metadata:
sigma = float(
os.path.split(self.filename)[-1].split('MPa')[0].strip().replace(
',', '.'))
meta['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#remove whitespace
data.columns = [c.strip() for c in data.columns]
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_BAGKoeln(CITTBase):
def _define_units(self):
self.unit_s = 1.0 #mm
self.unit_F = 1.0 #N
self.unit_t = 1. / 1000.0 #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'T': ['Solltemperatur'],
't': ['TIME'],
'speciment_diameter': ['Probendurchmesser'],
'speciment_height': ['Probenhöhe'],
} #list of names
self.data_column_names = {
'time': ['TIME'],
'f': ['FREQUENZ'],
'F': ['Load'],
's_hor_1': ['SENSOR 4'],
's_hor_2': ['SENSOR Extension'],
's_piston': ['Position'],
'N': ['Impulsnummer'],
}
def _process_data(self):
meta = {}
splitsign = ':;'
encoding = 'latin-1'
skiprows = 14
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
#remove whitespace
line = line.decode(encoding)
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
if count >= skiprows:
break
line = line.decode(encoding)
head = line.split(';')
data.columns = head
# FIX: Sigma nicht in Metadaten oder Messdaten enthalten
sigma = float(
os.path.split(self.filename)[-1].split('MPa')[0].strip().replace(
',', '.'))
meta['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#remove whitespace
data.columns = [c.strip() for c in data.columns]
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_Braunschweig(CITTBase):
def _define_units(self):
self.unit_s = 1.0 #mm
self.unit_F = -1000.0 #N
self.unit_t = 1.0
def update_parameter(self):
self.meta_names_of_parameter = {
't': ['t [s]'],
} #list of names
self.data_column_names = {
'time': ['t [s]'],
'f': [ 'FREQUENZ_x10'],
'F': [ 'F act [kN]'],
's_hor_1': ['r 1A [mm]'],
's_hor_2': ['r 1B [mm]'],
's_piston': ['s piston [mm]'],
}
def _process_data(self):
self._logger.info(self.metadata)
meta = {}
encoding = 'latin-1'
self.data.seek(0)
head = pd.read_csv(self.data,
skiprows=14,
sep=';',
nrows=1,
encoding=encoding,
decimal=',',
thousands='.',
skip_blank_lines=True).columns
head = [h.strip() for h in head]
self.data.seek(0)
data = pd.read_csv(self.data,
skiprows=20,
sep=';',
encoding=encoding,
decimal=',',
thousands='.',
header=None,
skip_blank_lines=True)
data.columns = head
# add frequency
## [start, ncycles]
sel = {0.1: [0, 11], 1.0: [11, 20], 5.0: [30, 100], 10: [130, 111]}
N_f_dict = {}
Ns = 0
for freq in sel.keys():
par = sel[freq]
N = par[0]
dN = par[1]
for i in range(N, N + dN):
N_f_dict[Ns] = freq
Ns += 1
data['f'] = data['N'].replace(N_f_dict)
data = data[data['f'] <= 10.0]
#sigma
data['sigma'] = self.metadata['sigma']
data['T'] = self.metadata['temperature']
#clean data
#data = data.dropna(axis=0)
#data = data.dropna(axis=1)
data = data.drop(columns=['Date Time'])
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())