Datenmodelle angepasst, einige Firmen in CITT übernommen

This commit is contained in:
Markus Clauß
2023-03-02 17:31:39 +01:00
parent e5c9f6904c
commit 1bbb560f31
14 changed files with 1421 additions and 189 deletions

View File

@@ -4,101 +4,374 @@ from csv import reader
import numpy as np
import pandas as pd
from bson import ObjectId
from paveit import calc_nu, fit_cos
from paveit.datamodels import CITTSiffness, CITTSiffnessResults
from paveit.io import read_geosys
from paveit.labtest import DataSineLoad
class CITTBase(DataSineLoad):
def _calc(self):
return (self.df.mean().mean(), self.df.max().max())
def _sel_df(self, df, num=5, shift=-1):
N = df['N'].unique()
n_N = len(N)
max_N = max(N)
min_N = min(N)
class CITT_KIT(DataSineLoad):
freq = float(df['f'].unique()[0])
# define cycles to select
if freq == 10.0:
Nfrom = 98
Nto = 103
elif freq == 5.0:
Nfrom = 93
Nto = 97
elif freq == 3.0:
Nfrom = 43
Nto = 47
elif freq == 1.0:
Nfrom = 13
Nto = 17
elif freq == 0.3:
Nfrom = 8
Nto = 12
elif freq == 0.1:
Nfrom = 3
Nto = 7
else:
Nfrom = None
Nto = None
# Fall 1: nur num Lastwechsel
if n_N < num:
df_sel = None
elif n_N == num:
df_sel = df
# Fall 2: nicht alle LW in Datei
elif (max_N < Nto) & (n_N > num):
df_sel = df[(df['N'] >= N[-num + shift])
& (df['N'] <= N[-1 + shift])]
# Fall 3: Auswahl wie oben definiert
elif (Nfrom >= min_N) & (Nto < max_N):
df_sel = df[(df['N'] >= Nfrom) & (df['N'] <= Nto)]
# Fall 4: Auswahl unbekannt
else:
df_sel = None
return df_sel
def _fit_select_data(self):
"""
select N load cycles from original data
(a): Based on window of TP-Asphalt
(b) last N cycles
"""
self._logger.debug('run _fit_select_data')
self.max_N_in_data = []
if not isinstance(self.data, list):
if self.number_of_load_cycles_for_analysis > 1:
self.max_N_in_data.append(self.data['N'].max())
df_sel = [
self._sel_df(self.data,
num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = [self.data]
else:
df_sel = []
for d in self.data:
self.max_N_in_data.append(d['N'].max())
if self.number_of_load_cycles_for_analysis > 1:
d_sel = self._sel_df(
d, num=self.number_of_load_cycles_for_analysis)
else:
d_sel = d
df_sel.append(d_sel)
# replace data
self.data = df_sel
def _calc(self):
return (self.df.mean().mean(), self.df.max().max())
self._logger.info('run _calc CITT')
print('run CITT')
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
if len(data) < 10: continue
try:
data.index = data.index - data.index[0]
res_temp = {}
res_temp['idx'] = idx_data
x = data.index.values
freq = np.round(float(data['f'].unique()), 2)
sigma = float(data['sigma'].unique())
temperature = float(data['T'].unique())
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
y = data[col].values
res = fit_cos(x, y, freq=freq)
for key, value in res.items():
res_temp[f'fit_{col}_{key}'] = value
res_temp[f'fit_{col}_max'] = max(y)
res_temp[f'fit_{col}_min'] = min(y)
# add more metadata
res_temp['f_set'] = freq
res_temp['sigma_set'] = sigma
res_temp['T_set'] = temperature
res_temp['N_from'] = data['N'].min()
res_temp['N_to'] = data['N'].max()
res_temp['N_tot'] = self.max_N_in_data[idx_data]
res_temp['n_samples_per_cycle'] = int(
len(data) / (res_temp['N_to'] - res_temp['N_from'] + 1))
## Stiffness
deltaF = res_temp['fit_F_amp']
nu = calc_nu(temperature)
res_temp['nu'] = nu
h = float(self.metadata['speciment_height'])
deltaU = res_temp['fit_s_hor_sum_amp']
res_temp['stiffness'] = (deltaF * (0.274 + nu)) / (h * deltaU)
# TODO: Überarbeiten und erweitern (ISSUE #2)
res_temp['phase'] = res_temp['fit_F_phase'] - res_temp[
'fit_s_hor_sum_phase']
except:
res_temp = None
self.fit.append(res_temp)
self.fit = pd.DataFrame.from_records(self.fit)
self.fit = self.fit.reset_index(drop=True).set_index('idx')
#self.fit = self.fit.set_index(['T', 'f', 'sigma'])
nsamples = len(self.fit)
self._logger.info(f'fitting finished, add {nsamples} samples')
self._logger.debug(self.fit['stiffness'])
def save(self,
org_id: ObjectId,
project_id: ObjectId,
material_id: ObjectId,
user_id: ObjectId,
meta: dict = {},
wp_id: ObjectId | None = None):
"""
save results to mongodb
"""
if not hasattr(self, 'fit'):
raise
# precheck data and results
assert len(self.data) == len(self.fit)
for idx_fit, fit in self.fit.iterrows():
data = self.data[idx_fit]
meta['filehash'] = self.filehash
meta['org_id'] = org_id
meta['project_id'] = project_id
meta['material'] = material_id
meta['user_id'] = user_id
#check if result in db
#n = CITTSiffness.objects(**meta).count()
#print(n)
# write data
data_dict = fit.to_dict()
data_dict.update(meta)
# remove 'fit_' from keys:
for key in list(data_dict.keys()):
if key.startswith('fit_'):
data_dict[key[4:]] = data_dict[key]
data_dict.pop(key)
# rename fields
def rename_field(d, old, new):
d[new] = d[old]
d.pop(old)
f = CITTSiffnessResults(**data_dict).save()
# required data
data_out = dict(
time=data.index,
F=list(data['F']),
N=list(data['N']),
s_hor_1=list(data['s_hor_1']),
s_hor_2=list(data['s_hor_2']),
s_hor_sum=list(data['s_hor_sum']),
)
# add optional datas
for col in ['s_piston']:
if col in data.columns:
data_out[col] = list(data[col])
g = CITTSiffness(result=f.id, **data_out).save()
class CITT_TUDresden(CITTBase):
def _which_machine(self):
"""
check the file and try to get the machine from the data
"""
self._machine = 'TIRA'
def _define_units(self):
if self._machine == 'TIRA':
self.unit_s = 1 #mm
self.unit_F = 1. #N
self.unit_t = 1. #s
def update_parameter(self):
if self._machine == 'TIRA':
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Sollfrequnz'],
'T': ['Solltemperatur'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
#'Nto': ['Letzer Aufzeichnungslastwechsel', 'Last Cycle'],
't': ['Zeit'],
'speciment_diameter': ['PK-Durchmesser'],
'speciment_height': ['PK-Höhe'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Kraft'],
'N': ['Lastwechsel'],
's_hor_1': ['IWA_1 2 mm'],
's_hor_2': ['IWA_2 2 mm'],
's_piston': ['Kolbenweg'],
}
def _process_data(self):
self._logger.debug('convert bytes to pandas.DataFrame')
self.data.seek(0)
with io.TextIOWrapper(self.data, encoding='latin-1') as read_obj:
csv_reader = reader(read_obj, delimiter=';')
# -----------------------------------------------------------------------------------------------
# TIRA
# -----------------------------------------------------------------------------------------------
if self._machine == 'TIRA':
encoding = 'latin-1'
skiprows = 29
hasunits = True
splitsign = ':;'
read = False
# metadata from file
meta = {}
data = []
temp = []
self.data.seek(0)
f = self.data.readlines()
count = 0
for idx_row, row in enumerate(csv_reader):
if row == ['*****']:
for line in f:
count += 1
if read == False:
read = True
else:
read = False
data.append(temp)
line = line.decode(encoding)
temp = []
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
continue
if len(linesplit) == 2:
if read:
key = linesplit[0].strip()
value = linesplit[1].split(';')[0].strip()
row = [r.replace(',', '.') for r in row]
meta[key] = value
temp.append(row)
if count >= skiprows:
break
#convert to pandas
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
header=None,
skiprows=skiprows,
decimal=',',
thousands='.',
sep=';')
res = []
data = data.iloc[2:]
freqs = [10.0, 5.0, 1.0, 0.1, 10.0]
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for idx_data, d in enumerate(data):
for line in f:
t = pd.DataFrame(d[3:])
t.columns = d[1]
line = line.decode(encoding)
freq = freqs[idx_data]
t['f'] = freq
count += 1
for col in t.columns:
t[col] = pd.to_numeric(t[col])
if count >= skiprows:
break
# add cycle number
dt = 1. / freq
#clean data
data = data.dropna(axis=1)
Nmax = int(np.ceil(t['ZEIT'].max() / dt))
# add header from file
head = line.split(';')
N = np.zeros_like(t['ZEIT'])
for i in range(Nmax):
if i == 0:
tmin = 0
tmax = dt
else:
tmax = (i + 1) * dt
tmin = (i) * dt
idx = t[(t['ZEIT'] >= tmin) & (t['ZEIT'] < tmax)].index
N[idx] = i
t['N'] = N
res.append(t)
#remove second 10 Hz
res = pd.concat(res[:-1])
res['T'] = self.temperature
#res = res.sort_values(['f', 'ZEIT'])
data.columns = head
data.columns = [l.strip() for l in data.columns]
#define in class
self.data = res.reset_index()
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_PTMDortmund(DataSineLoad):
class CITT_PTMDortmund(CITTBase):
def _define_units(self):
@@ -212,10 +485,7 @@ class CITT_PTMDortmund(DataSineLoad):
time_idx = None
assert time_idx is not None
temp['N'] = 0
#BUG: Ist in Messdatei falsch definiert und wird von PTM angepasst. '''
#for cycle in range(Nfrom, Nto+1):
temp['N'] = 1
dt = 1.0 / frequency_test
@@ -230,8 +500,8 @@ class CITT_PTMDortmund(DataSineLoad):
#filter data
idx = temp[(time_idx >= tmin) & (time_idx < tmax)].index
#set cycle number
temp.loc[idx, 'N'] = cycle
#set cycle number, cycle starts with 1
temp.loc[idx, 'N'] = cycle + 1
cycle += 1
@@ -275,4 +545,202 @@ class CITT_PTMDortmund(DataSineLoad):
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
self._logger.info(self.data.head())
class CITT_UniSiegen(CITTBase):
def _define_units(self):
self.unit_s = 1 / 1000. #mm
self.unit_F = 1.0 #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Frequenz'],
'T': ['Prüftemperatur'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
'Nto': ['Maximale Zyklenanzahl'],
't': ['Zeit'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Kraft'],
's_hor_1': ['Radialweg vorn'],
's_hor_2': ['Radialweg hinten'],
's_piston': ['Kolbenweg'],
'N': ['Zyklenzähler'],
}
def _process_data(self):
meta, data = read_geosys(self.data, '015', metadata_ids=['003'])
#define in class
self.data = data.reset_index()
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_BASt(CITTBase):
def _define_units(self):
self.unit_s = 1 / 1000. #mm
self.unit_F = 1.0 #N
self.unit_t = 1. #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'f': ['Versuchsart'],
'T': ['Prüftemperatur\r\n'],
#'Nfrom': ['Erster Aufzeichnungslastwechsel', 'Start Cycle'],
'Nto': ['Maximale Zyklenanzahl'],
't': ['Zeit'],
} #list of names
self.data_column_names = {
'time': ['Zeit'],
'F': ['Vertikalkraft'],
's_hor_1': ['Horizontalweg IWA vorn'],
's_hor_2': ['Horizontalweg IWA hinten'],
's_piston': ['Kolbenposition'],
'N': ['Zyklenzähler'],
}
def _process_data(self):
meta, data = read_geosys(self.data,
'047',
metadata_ids=['003', '005', '015'])
#define in class
self.data = data.reset_index()
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
def _modify_meta(self):
s = self.metadata['f']
s = s.split('Hz')[0].split('Steifigkeit')[-1].strip().replace(',', '.')
self.metadata['f'] = float(s)
s = self.metadata['T']
s = s.split('°C')[0].strip().replace(',', '.')
self.metadata['T'] = float(s)
class CITT_LaborHart(CITTBase):
def _define_units(self):
self.unit_s = 1.0 #mm
self.unit_F = 1.0 #N
self.unit_t = 1. / 1000.0 #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'T': ['Solltemperatur'],
't': ['TIME'],
'speciment_diameter': ['Probendurchmesser'],
'speciment_height': ['Probenhöhe'],
} #list of names
self.data_column_names = {
'time': ['TIME'],
'f': ['FREQUENZ'],
'F': ['Load'],
's_hor_1': ['SENSOR 4'],
's_hor_2': ['SENSOR Extension'],
's_piston': ['Position'],
'N': ['Impulsnummer'],
}
def _process_data(self):
meta = {}
splitsign = ':;'
encoding = 'latin-1'
skiprows = 14
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
#remove whitespace
line = line.decode(encoding)
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
if count >= skiprows:
break
line = line.decode(encoding)
head = line.split(';')
data.columns = head
# FIX: Sigma nicht in Metadaten oder Messdaten enthalten
sigma = float(
os.path.split(self.filename)[-1].split('MPa')[0].strip().replace(
',', '.'))
meta['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#remove whitespace
data.columns = [c.strip() for c in data.columns]
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())