Auswertung Schersteifigkeit Labor Hart ergänzt

This commit is contained in:
Markus Clauß
2023-03-03 12:53:03 +01:00
parent 1bbb560f31
commit e1dd4c7c00
6 changed files with 482 additions and 594 deletions

View File

@@ -27,9 +27,9 @@ class RawData(Document):
class DataSheartest(RawData):
#results
result_id = LazyReferenceField(DynamicShearTest,
required=True,
reverse_delete_rule=CASCADE)
result = LazyReferenceField(DynamicShearTest,
required=True,
reverse_delete_rule=CASCADE)
# data
time = ListField(FloatField())
@@ -39,6 +39,8 @@ class DataSheartest(RawData):
s_vert_2 = ListField(FloatField())
s_vert_sum = ListField(FloatField(), required=False)
s_piston = ListField(FloatField(), required=False)
s_hor_1 = ListField(FloatField(), required=False)
s_hor_2 = ListField(FloatField(), required=False)
class CITTSiffness(RawData):

View File

@@ -48,6 +48,8 @@ class DynamicShearTest(Document):
False,
'auto_create_index':
True,
"db_alias":
'dblabtests',
'collection':
'sheartest',
'indexes': [
@@ -65,43 +67,83 @@ class DynamicShearTest(Document):
class DynamicShearTestExtension(DynamicShearTest):
#metadata
f = FloatField(required=True)
f_set = FloatField(required=True)
sigma_normal = FloatField(required=True)
T = FloatField(required=True)
T_set = FloatField(required=True)
extension = FloatField(required=True)
stiffness = FloatField(required=True)
bruch = BooleanField(required=True)
N_from = IntField()
N_to = IntField()
N_tot = IntField()
n_samples_per_cycle = IntField()
G = FloatField(required=True)
broken = BooleanField(required=True)
#fit parameter
## required parameters
## F
fit_amp_F = FloatField(required=True)
fit_freq_F = FloatField(required=True)
fit_phase_F = FloatField(required=True)
fit_offset_F = FloatField(required=True)
fit_slope_F = FloatField(required=True)
F_amp = FloatField(required=True)
F_freq = FloatField(required=True)
F_phase = FloatField(required=True)
F_offset = FloatField(required=True)
F_slope = FloatField(required=True)
F_r2 = FloatField(required=True)
F_max = FloatField(required=True)
F_min = FloatField(required=True)
## S1
fit_amp_s_vert_1 = FloatField(required=True)
fit_freq_s_vert_1 = FloatField(required=True)
fit_phase_s_vert_1 = FloatField(required=True)
fit_offset_s_vert_1 = FloatField(required=True)
fit_slope_s_vert_1 = FloatField(required=True)
r2_s_vert_1 = FloatField(required=True)
s_vert_1_amp = FloatField(required=True)
s_vert_1_freq = FloatField(required=True)
s_vert_1_phase = FloatField(required=True)
s_vert_1_offset = FloatField(required=True)
s_vert_1_slope = FloatField(required=True)
s_vert_1_r2 = FloatField(required=True)
s_vert_1_max = FloatField(required=True)
s_vert_1_min = FloatField(required=True)
## S2
fit_amp_s_vert_2 = FloatField(required=True)
fit_freq_s_vert_2 = FloatField(required=True)
fit_phase_s_vert_2 = FloatField(required=True)
fit_offset_s_vert_2 = FloatField(required=True)
fit_slope_s_vert_2 = FloatField(required=True)
r2_s_vert_2 = FloatField(required=True)
## S-Sum
fit_amp_s_vert_sum = FloatField(required=True)
fit_freq_s_vert_sum = FloatField(required=True)
fit_phase_s_vert_sum = FloatField(required=True)
fit_offset_s_vert_sum = FloatField(required=True)
fit_slope_s_vert_sum = FloatField(required=True)
r2_s_vert_sum = FloatField(required=True)
## r2
r2_F = FloatField(required=True)
r2_s_vert_1 = FloatField(required=True)
r2_s_vert_2 = FloatField(required=True)
r2_s_vert_sum = FloatField(required=True)
s_vert_2_amp = FloatField(required=True)
s_vert_2_freq = FloatField(required=True)
s_vert_2_phase = FloatField(required=True)
s_vert_2_offset = FloatField(required=True)
s_vert_2_slope = FloatField(required=True)
s_vert_2_r2 = FloatField(required=True)
s_vert_2_max = FloatField(required=True)
s_vert_2_min = FloatField(required=True)
## optional parameters
s_vert_sum_amp = FloatField(required=False)
s_vert_sum_freq = FloatField(required=False)
s_vert_sum_phase = FloatField(required=False)
s_vert_sum_offset = FloatField(required=False)
s_vert_sum_slope = FloatField(required=False)
s_vert_sum_r2 = FloatField(required=False)
s_vert_sum_max = FloatField(required=False)
s_vert_sum_min = FloatField(required=False)
s_hor_sum_amp = FloatField(required=False)
s_hor_sum_freq = FloatField(required=False)
s_hor_sum_phase = FloatField(required=False)
s_hor_sum_offset = FloatField(required=False)
s_hor_sum_slope = FloatField(required=False)
s_hor_sum_r2 = FloatField(required=False)
s_hor_sum_max = FloatField(required=False)
s_hor_sum_min = FloatField(required=False)
s_hor_1_amp = FloatField(required=False)
s_hor_1_freq = FloatField(required=False)
s_hor_1_phase = FloatField(required=False)
s_hor_1_offset = FloatField(required=False)
s_hor_1_slope = FloatField(required=False)
s_hor_1_r2 = FloatField(required=False)
s_hor_1_max = FloatField(required=False)
s_hor_1_min = FloatField(required=False)
## S2
s_hor_2_amp = FloatField(required=False)
s_hor_2_freq = FloatField(required=False)
s_hor_2_phase = FloatField(required=False)
s_hor_2_offset = FloatField(required=False)
s_hor_2_slope = FloatField(required=False)
s_hor_2_r2 = FloatField(required=False)
s_hor_2_max = FloatField(required=False)
s_hor_2_min = FloatField(required=False)

View File

@@ -14,6 +14,7 @@ class Organisation(Document):
}})
labtest_citt = StringField(required=False)
labtest_shear_extension = StringField(required=False)
meta = {
'allow_inheritance': True,

View File

@@ -64,6 +64,8 @@ class DataSineLoad():
'F', 's_hor_sum', 's_hor_1', 's_hor_2', 's_piston'
]
self.round_values = [('T', 3)]
# Header names after standardization; check if exists
self.val_header_names = ['speciment_height', 'speciment_diameter']
@@ -197,7 +199,10 @@ class DataSineLoad():
def _post_apply_units(self):
for col in ['s_hor_sum', 's_hor_1', 's_hor_2']:
for col in [
's_hor_sum', 's_hor_1', 's_hor_2', 's_vert_sum', 's_vert_1',
's_vert_2'
]:
if col in self.data.columns:
self.data[col] = self.data[col].mul(self.unit_s)
@@ -209,6 +214,13 @@ class DataSineLoad():
return True
def _post_round_values(self):
for par, digits in self.round_values:
if par in self.data.columns:
self.data[par] = self.data[par].round(digits)
def _post_select_importent_columns(self):
# TODO: add more columns, check datamodel
@@ -223,6 +235,10 @@ class DataSineLoad():
self.data['s_hor_sum'] = self.data[['s_hor_1',
's_hor_2']].sum(axis=1)
if not 's_vert_sum' in cols:
self.data['s_vert_sum'] = self.data[['s_vert_1',
's_vert_2']].sum(axis=1)
def _post_opt_data(self):
#set dtypes:
for col in self.col_as_int:
@@ -342,6 +358,7 @@ class DataSineLoad():
self._post_string_to_float()
self._post_select_importent_columns()
self._post_apply_units()
self._post_round_values()
self._post_calc_missiong_values()
self._post_opt_data()

View File

@@ -133,6 +133,7 @@ class CITTBase(DataSineLoad):
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
y = data[col].values
res = fit_cos(x, y, freq=freq)
@@ -744,3 +745,105 @@ class CITT_LaborHart(CITTBase):
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())
class CITT_BAGKoeln(CITTBase):
def _define_units(self):
self.unit_s = 1.0 #mm
self.unit_F = 1.0 #N
self.unit_t = 1. / 1000.0 #s
def update_parameter(self):
self.meta_names_of_parameter = {
'sigma': ['Oberspannung'],
'T': ['Solltemperatur'],
't': ['TIME'],
'speciment_diameter': ['Probendurchmesser'],
'speciment_height': ['Probenhöhe'],
} #list of names
self.data_column_names = {
'time': ['TIME'],
'f': ['FREQUENZ'],
'F': ['Load'],
's_hor_1': ['SENSOR 4'],
's_hor_2': ['SENSOR Extension'],
's_piston': ['Position'],
'N': ['Impulsnummer'],
}
def _process_data(self):
meta = {}
splitsign = ':;'
encoding = 'latin-1'
skiprows = 14
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
#remove whitespace
line = line.decode(encoding)
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
if count >= skiprows:
break
line = line.decode(encoding)
head = line.split(';')
data.columns = head
# FIX: Sigma nicht in Metadaten oder Messdaten enthalten
sigma = float(
os.path.split(self.filename)[-1].split('MPa')[0].strip().replace(
',', '.'))
meta['sigma'] = sigma
#clean data
data = data.dropna(axis=1)
#remove whitespace
data.columns = [c.strip() for c in data.columns]
#define in class
self.data = data
self.metadata.update(meta)
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())

View File

@@ -1,16 +1,15 @@
import io
import logging
import os
import lmfit as lm
import numpy as np
import pandas as pd
from paveit_worker.libs.labtests.base import DataSineLoad
#import scipy.fft as sfft
#from pytestpavement.labtests.base import DataSineLoad
#from pytestpavement.models.data import DataSheartest
#from pytestpavement.models.sheartest import DynamicShearTestExtension
from bson import ObjectId
from paveit import calc_nu, fit_cos
from paveit.datamodels import DataSheartest, DynamicShearTestExtension
from paveit.io import read_geosys
from paveit.labtest import DataSineLoad
class ShearTest(DataSineLoad):
@@ -19,256 +18,220 @@ class ShearTest(DataSineLoad):
"""
def __init__(self,
fname: str,
filename: str,
metadata: dict,
logger=None,
debug: bool = False,
gap_width: float = 1.0,
roundtemperature: bool = True,
archive_file=False,
s3_params: dict = {}):
data: None | io.BytesIO = None):
self.filename = filename
self.metadata = metadata
#set parameter
self.gap_width = gap_width
self.debug = debug
self.file = fname
self.roundtemperature = roundtemperature
self.archive_file = archive_file
self.s3_params = s3_params
# process file
self._run()
if isinstance(data, io.BytesIO):
self.data = data
def plot_fited_data(self, opath=None, pkname=None, r2min=0.99):
ylabel_dict = {
'F': 'Kraft in N',
's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm',
's_piston': 'norm. Kolbenweg\n in mm',
's_vert_1': 'Scherweg\n $S_1$ in mm',
's_vert_2': 'Scherweg\n $S_2$ in mm'
}
columns_analyse = [
'F',
's_vert_sum',
's_vert_1',
's_vert_2',
's_piston',
]
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname, 'raw_data')
if not os.path.exists(opath):
os.makedirs(opath)
self.debug = debug
if logger == None:
self._logger = logging.getLogger(__name__)
else:
showplot = True
self._logger = logger
for i, fit in self.fit.iterrows():
self._logger.info(
f'filename s3: {self.filename}, metadata: {self.metadata}')
if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]):
continue
self._pre_run()
data = self.data[int(fit['idx_data'])]
def _sel_df(self, df, num=5, shift=-1):
if data is None:
continue
N = df['N'].unique()
n_N = len(N)
max_N = max(N)
min_N = min(N)
freq = data['f'].unique()[0]
sigma = data['sigma_normal'].unique()[0]
s = data['extension'].unique()[0]
T = data['T'].unique()[0]
# Fall 1: nur num Lastwechsel
if n_N < num - shift:
df_sel = None
elif n_N == num - shift:
df_sel = df
fig, axs = plt.subplots(len(columns_analyse),
1,
figsize=(8, len(columns_analyse) * 2),
sharex=True)
for idxcol, col in enumerate(columns_analyse):
x, y = data.index, data[col]
#add fit
f = self.fit.iloc[i]
parfit = {}
for k in ['amp', 'freq', 'phase', 'offset', 'slope']:
parfit[k] = f[f'fit_{k}_{col}']
yreg = fit_cos_eval(x, parfit)
if col in ['s_piston', 's_vert_sum']:
y = y - np.mean(y)
yreg = yreg - np.mean(yreg)
plt.sca(axs[idxcol])
plt.plot(x, y, label='Messdaten')
r2 = np.round(f[f'r2_{col}'], 3)
plt.plot(x,
yreg,
alpha=0.7,
label=f'Regression ($R^2 = {r2}$)')
if not ('F' in col):
s = f['extension']
parline = dict(lw=0.4,
ls='--',
color='lightgrey',
alpha=0.4,
label='Bereich des zul. Scherweges')
plt.axhspan(-s, s, **parline)
if idxcol == len(columns_analyse) - 1:
plt.xlabel('Zeit in s')
plt.ylabel(ylabel_dict[col])
plt.legend()
plt.tight_layout()
if showplot:
plt.show()
break
else:
ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x')
ofile = os.path.join(opath, ofile + '.pdf')
plt.savefig(ofile)
plt.close()
class ShearTestExtension(ShearTest):
def runfit(self):
self._fit_data()
def file_in_db(self):
n = DynamicShearTestExtension.objects(filehash=self.filehash).count()
if n > 0:
return True
# Fall 2: nicht alle LW in Datei
else:
return False
df_sel = df[(df['N'] >= N[-num + shift])
& (df['N'] <= N[-1 + shift])]
def save(self, material1, material2, bounding, meta: dict):
for i, fit in self.fit.iterrows():
data = self.data[int(fit['idx_data'])]
#check if data in db
n = DynamicShearTestExtension.objects(
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
material1=material1,
material2=material2,
bounding=bounding,
filehash=self.filehash,
).count()
if n > 0: continue
# save fit
values = {}
for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']:
values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}']
values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}']
values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}']
values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}']
values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}']
values[f'r2_{col}'] = fit[f'r2_{col}']
values.update(meta)
try:
r = DynamicShearTestExtension(
#metadata
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
filehash=self.filehash,
material1=material1,
material2=material2,
bounding=bounding,
#results
stiffness=fit['G'],
#
**values).save()
#save raw data
rdata = DataSheartest(
result_id=r.id,
time=data.index.values,
F=data['F'].values,
N=data['N'].values,
s_vert_1=data['s_vert_1'].values,
s_vert_2=data['s_vert_2'].values,
s_vert_sum=data['s_vert_sum'].values,
s_piston=data['s_piston'].values,
).save()
except:
print('error saving data')
raise
rdata.delete()
if self.archive_file:
mclient = MinioClient(self.s3_params['S3_URL'],
self.s3_params['S3_ACCESS_KEY'],
self.s3_params['S3_SECRET_KEY'],
bucket=str(meta['org_id']))
extension = os.path.splitext(self.file)[-1]
ofilename = self.filehash + extension
outpath = 'sheartest'
metadata_s3 = {
'project_id': str(meta['project_id']),
'user_id': str(meta['user_id']),
'filename': os.path.split(self.file)[-1],
'speciment': meta['speciment_name']
}
mclient.compress_and_upload_file(self.file,
ofilename,
outpath=outpath,
content_type="application/raw",
metadata=metadata_s3)
return df_sel
def _set_parameter(self):
self._logger.debug('run _set_parameter')
self.split_data_based_on_parameter = [
'T', 'sigma_normal', 'f', 'extension'
]
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 'f', 's_vert_sum']
self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 's_hor_2']
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_diameter']
self.val_col_names = [
'time', 'T', 'f', 'sigma_normal', 'extension', 'N', 'F', 's_hor_1',
's_hor_2', 's_vert_1', 's_vert_2'
]
self.round_values = [('T', 1), ('sigma_normal', 1), ('f', 1)]
self.columns_analyse = [
'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston'
'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_hor_1', 's_hor_2',
's_hor_sum', 's_piston'
]
# Header names after standardization; check if exists
self.val_header_names = [
'speciment_height', 'speciment_diameter', 'broken'
]
self.number_of_load_cycles_for_analysis = 5
def _calc_missiong_values(self):
self.meta_names_of_parameter = {} #list of names
cols = self.data.columns
self.data_column_names = {
'time': ['Time Series'],
'F': ['Load Series'],
's_hor_1': ['LVDT1 Series'],
's_hor_2': ['LVDT2 Series'],
}
for c in ['vert']:
if not f's_{c}_sum' in cols:
self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2'
]].sum(axis=1).div(2.0)
def _fit_data(self):
class ShearTestExtension(ShearTest):
def save(
self,
org_id: ObjectId,
project_id: ObjectId,
material_1_id: ObjectId,
material_2_id: ObjectId,
material_boundary_id: ObjectId,
user_id: ObjectId,
meta: dict = {},
wp_id: ObjectId | None = None,
broken: bool = False,
gap_width: float = 1.0, #mm
):
"""
save results to mongodb
"""
if not hasattr(self, 'fit'):
raise
# precheck data and results
# assert len(self.data) == len(self.fit)
for idx_fit, fit in self.fit.iterrows():
data = self.data[idx_fit]
meta['filehash'] = self.filehash
meta['org_id'] = org_id
meta['project_id'] = project_id
meta['workpackage_id'] = wp_id
meta['user_id'] = user_id
meta['material1'] = material_1_id
meta['material2'] = material_2_id
meta['bounding'] = material_boundary_id
#check if result in db
#n = CITTSiffness.objects(**meta).count()
#print(n)
# write data
data_dict = fit.to_dict()
data_dict.update(meta)
# remove 'fit_' from keys:
for key in list(data_dict.keys()):
if key.startswith('fit_'):
data_dict[key[4:]] = data_dict[key]
data_dict.pop(key)
# rename fields
def rename_field(d, old, new):
d[new] = d[old]
d.pop(old)
f = DynamicShearTestExtension(**data_dict).save()
# required data
data_out = dict(
time=data.index,
F=list(data['F']),
N=list(data['N']),
s_vert_1=list(data['s_hor_1']),
s_vert_2=list(data['s_hor_2']),
s_vert_sum=list(data['s_hor_sum']),
)
# add optional datas
for col in ['s_piston', 's_hor_1', 's_hor_2']:
if col in data.columns:
data_out[col] = list(data[col])
g = DataSheartest(result=f.id, **data_out).save()
def _fit_select_data(self):
"""
select N load cycles from original data
(a): Based on window of TP-Asphalt
(b) last N cycles
"""
self._logger.debug('run _fit_select_data')
self.max_N_in_data = []
if not isinstance(self.data, list):
if self.number_of_load_cycles_for_analysis > 1:
self.max_N_in_data.append(self.data['N'].max())
df_sel = [
self._sel_df(self.data,
num=self.number_of_load_cycles_for_analysis)
]
else:
df_sel = [self.data]
else:
df_sel = []
for d in self.data:
self.max_N_in_data.append(d['N'].max())
if self.number_of_load_cycles_for_analysis > 1:
d_sel = self._sel_df(
d, num=self.number_of_load_cycles_for_analysis)
else:
d_sel = d
df_sel.append(d_sel)
# replace data
self.data = df_sel
def _calc(self):
"""
Calculate Results
"""
self._logger.info('run _calc base')
print('run BASE')
self.fit = []
@@ -278,406 +241,166 @@ class ShearTestExtension(ShearTest):
data.index = data.index - data.index[0]
res = {}
res['idx_data'] = int(idx_data)
res_temp = {}
res_temp['idx'] = idx_data
# Fitting
freq = float(np.round(data['f'].mean(), 4))
if (self.debug):
sigma_normal = np.round(data['sigma_normal'].mean(), 3)
T = np.round(data['T'].mean(), 3)
freq = data['f'].mean()
sigma_normal = data['sigma_normal'].mean()
T = data['T'].mean()
extension = data['extension'].mean()
x = data.index.values
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
x = data.index.values
y = data[col].values
# Fourier Transformation
"""
dt = np.diff(x).mean() #mean sampling rate
n = len(x)
res = fit_cos(x, y, freq=freq)
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
"""
for key, value in res.items():
res_temp[f'fit_{col}_{key}'] = value
res_fit = fit_cos(x, y, freq=freq, constfreq=True)
res_temp[f'fit_{col}_max'] = max(y)
res_temp[f'fit_{col}_min'] = min(y)
res[f'r2_{col}'] = res_fit['r2']
# add more metadata
res_temp['f_set'] = freq
res_temp['sigma_normal'] = sigma_normal
res_temp['T_set'] = T
res_temp['extension'] = extension
res_temp['broken'] = self.metadata['broken']
res[f'fit_amp_{col}'] = res_fit['amp']
res[f'fit_freq_{col}'] = res_fit['freq']
res[f'fit_phase_{col}'] = res_fit['phase']
res[f'fit_offset_{col}'] = res_fit['offset']
res[f'fit_slope_{col}'] = res_fit['slope']
res_temp['N_from'] = int(data['N'].min())
res_temp['N_to'] = int(data['N'].max())
res_temp['N_tot'] = int(self.max_N_in_data[idx_data])
res_temp['n_samples_per_cycle'] = int(
len(data) / (res_temp['N_to'] - res_temp['N_from'] + 1))
## Schersteifigkeit berechnen
deltaF = res['fit_amp_F']
deltaS = res['fit_amp_s_vert_sum']
deltaF = res_temp['fit_F_amp']
deltaS = res_temp['fit_s_vert_sum_amp']
A = np.pi * self.meta['speciment_diameter']**2 / 4
A = np.pi * self.metadata['speciment_diameter']**2 / 4
tau = deltaF / A
gamma = deltaS / self.gap_width
res['G'] = tau / gamma
res_temp['G'] = tau / gamma
#metadaten
for c in ['T', 'extension', 'sigma_normal', 'f']:
res[c] = data[c][0]
#for c in ['T', 'extension', 'sigma_normal', 'f']:
# res_temp[c] = res_temp[c][0]
self.fit.append(res)
self.fit.append(res_temp)
if (self.debug) & (len(self.fit) > 5):
break
self.fit = pd.DataFrame.from_records(self.fit)
def plot_results(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
self.fit = self.fit.reset_index(drop=True).set_index('idx')
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
fig, ax = plt.subplots()
xticks = list(dfplot['extension'].unique())
df = dfplot
df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)]
sns.scatterplot(
data=df,
x='fit_amp_s_vert_sum',
y='G',
hue='T',
ax=ax,
alpha=0.7,
#size=150,
size="G",
sizes=(50, 160),
edgecolor='k',
palette='muted',
zorder=10)
df = dfplot
df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)]
if not df.empty:
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='G',
facecolor='grey',
alpha=0.5,
legend=False,
zorder=1,
ax=ax)
ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$')
ax.set_ylabel(r'Scherseteifigkeit in MPa/mm')
ax.set_xticks(xticks)
ax.grid()
if not showplot:
ofile = os.path.join(opath, 'shearstiffness.pdf')
plt.savefig(ofile)
plt.show()
def plot_stats(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
#r2
df = self.fit
fig, axs = plt.subplots(1, 2, sharey=True, sharex=True)
parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3)
# r2
ax = axs[0]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_F',
hue='T',
ax=ax,
**parscatter)
ax.set_ylabel('Bestimmtheitsmaß $R^2$')
ax.set_title('Kraft')
ax = axs[1]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_s_vert_sum',
hue='T',
legend=False,
ax=ax,
**parscatter)
ax.set_ylabel('$R^2$ (S_{mittel})')
ax.set_title('mittlerer Scherweg')
for ax in axs.flatten():
ax.grid()
ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$')
plt.tight_layout()
if not showplot:
ofile = os.path.join(opath, 'stats_r2.pdf')
plt.savefig(ofile)
plt.show()
nsamples = len(self.fit)
self._logger.info(f'fitting finished, add {nsamples} samples')
class ShearTestExtensionLaborHart(ShearTestExtension):
def _define_units(self):
self.unit_F = 1 / 1000.0 #N
self.unit_t = 1 / 1000. #s
self.unit_s = 1.0 #mm
self.unit_F = 1.0 #N
self.unit_t = 1. / 1000.0 #s
def _set_units(self):
def update_parameter(self):
#for col in ['F']:
# self.data[col] = self.data[col].mul(self.unit_F)
self.meta_names_of_parameter = {
'T': ['Solltemperatur'],
't': ['TIME'],
'speciment_diameter': ['Probendurchmesser'],
'speciment_height': ['Probenhöhe'],
} #list of names
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
self.data_column_names = {
'time': ['TIME'],
'f': ['Sollwert Frequenz'],
'T': ['SollTemperatur'],
'sigma_normal': ['Sollwert Normalspannung'],
'extension': ['Max Scherweg'],
'F': ['Load'],
's_hor_1': ['HORIZONTAL links'],
's_hor_2': ['HOIZONTAL Rechts'],
's_vert_1': ['VERTIKAL Links'],
's_vert_2': ['VERTIKAL Rechts'],
's_piston': ['Position'],
'N': ['Impulsnummer'],
}
return True
def _process_data(self):
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
# metadata from file
meta = {}
with open(self.file, 'r', encoding=encoding) as f:
count = 0
splitsign = ':;'
encoding = 'latin-1'
skiprows = 14
for line in f:
count += 1
self.data.seek(0)
f = self.data.readlines()
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
count = 0
if len(linesplit) == 2:
for line in f:
count += 1
meta[linesplit[0]] = linesplit[1]
#remove whitespace
line = line.decode(encoding)
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if count >= skiprows:
break
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
data = pd.read_csv(self.file,
self.data.seek(0)
data = pd.read_csv(self.data,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
with open(self.file, 'r', encoding=encoding) as f:
count = 0
self.data.seek(0)
f = self.data.readlines()
count = 0
for line in f:
count += 1
for line in f:
count += 1
if count >= skiprows:
break
if count >= skiprows:
break
line = line.decode(encoding)
head = line.split(';')
data.columns = head
#clean data
data = data.dropna(axis=1)
#define in class
self.meta = meta
self.data = data
return True
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if any(map(key.__contains__, ['Probenbezeichnung'])):
self.meta['speciment'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
self.meta['datetime'] = self.meta.pop(key)
try:
self.meta['datetime'] = pd.to_datetime(
self.meta['datetime'])
except:
pass
elif any(map(key.__contains__, ['Probenhöhe'])):
self.meta['speciment_height'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Probendurchmesser'])):
self.meta['speciment_diameter'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Solltemperatur'])):
self.meta['temperature'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Prüfbedingungen'])):
self.meta['test_version'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Name des VersAblf'])):
self.meta['test'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Prüfer'])):
self.meta['examiner'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
if col == 'TIME':
colnames[i] = 'time'
#set values
elif col == 'Sollwert Frequenz':
colnames[i] = 'f'
elif col == 'SollTemperatur':
colnames[i] = 'T'
elif col == 'Max Scherweg':
colnames[i] = 'extension'
elif col == 'Sollwert Normalspannung':
colnames[i] = 'sigma_normal'
elif col == 'Impulsnummer':
colnames[i] = 'N'
# measurements
elif col == 'Load':
colnames[i] = 'F'
elif col == 'Position':
colnames[i] = 's_piston'
elif col == 'VERTIKAL Links':
colnames[i] = 's_vert_1'
elif col == 'VERTIKAL Rechts':
colnames[i] = 's_vert_2'
elif col == 'HORIZONTAL links':
colnames[i] = 's_hor_1'
elif col == 'HOIZONTAL Rechts':
colnames[i] = 's_hor_2'
self.data.columns = colnames
class ShearTestExtensionTUDresdenGeosys(ShearTestExtension):
def _define_units(self):
self.unit_S = 1 / 1000.0 #N
def _set_units(self):
for col in [
's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension'
]:
self.data[col] = self.data[col].mul(self.unit_S)
#convert internal units to global
f = np.mean([0.9 / 355, 0.6 / 234.0, 0.3 / 116.0])
self.data['sigma_normal'] = self.data['sigma_normal'].mul(f).apply(
lambda x: np.round(x, 1))
return True
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
head, data = read_geosys(self.file, '015')
#remove whitespace
data.columns = [c.strip() for c in data.columns]
#define in class
self.meta = head
self.data = data
return True
self.metadata.update(meta)
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if key == 'd':
self.meta['speciment_diameter'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
#set values
if col == 'soll temperature':
colnames[i] = 'T'
elif col == 'soll extension':
colnames[i] = 'extension'
elif col == 'soll sigma':
colnames[i] = 'sigma_normal'
elif col == 'soll frequency':
colnames[i] = 'f'
elif col == 'Number of vertical cycles':
colnames[i] = 'N'
# measurements
elif col == 'vertical load from hydraulic pressure':
colnames[i] = 'F'
elif col == 'vertical position from hydraulic pressure':
colnames[i] = 's_piston'
elif col == 'Vertical position from LVDT 1':
colnames[i] = 's_vert_1'
elif col == 'Vertical position from LVDT 2':
colnames[i] = 's_vert_2'
self.data.columns = colnames
# log infos
self._logger.info(self.metadata)
self._logger.info(self.data.head())