From e1dd4c7c004b19eef1a98b741df0ebe9777385f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Clau=C3=9F?= Date: Fri, 3 Mar 2023 12:53:03 +0100 Subject: [PATCH] =?UTF-8?q?Auswertung=20Schersteifigkeit=20Labor=20Hart=20?= =?UTF-8?q?erg=C3=A4nzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/paveit/datamodels/data.py | 8 +- src/paveit/datamodels/sheartest.py | 108 ++- src/paveit/datamodels/usermanagement.py | 1 + src/paveit/labtest/base.py | 19 +- src/paveit/labtest/citt.py | 103 +++ src/paveit/labtest/sheartest.py | 837 ++++++++---------------- 6 files changed, 482 insertions(+), 594 deletions(-) diff --git a/src/paveit/datamodels/data.py b/src/paveit/datamodels/data.py index 65a42b6..fadea36 100644 --- a/src/paveit/datamodels/data.py +++ b/src/paveit/datamodels/data.py @@ -27,9 +27,9 @@ class RawData(Document): class DataSheartest(RawData): #results - result_id = LazyReferenceField(DynamicShearTest, - required=True, - reverse_delete_rule=CASCADE) + result = LazyReferenceField(DynamicShearTest, + required=True, + reverse_delete_rule=CASCADE) # data time = ListField(FloatField()) @@ -39,6 +39,8 @@ class DataSheartest(RawData): s_vert_2 = ListField(FloatField()) s_vert_sum = ListField(FloatField(), required=False) s_piston = ListField(FloatField(), required=False) + s_hor_1 = ListField(FloatField(), required=False) + s_hor_2 = ListField(FloatField(), required=False) class CITTSiffness(RawData): diff --git a/src/paveit/datamodels/sheartest.py b/src/paveit/datamodels/sheartest.py index ea4b486..92b3ee0 100644 --- a/src/paveit/datamodels/sheartest.py +++ b/src/paveit/datamodels/sheartest.py @@ -48,6 +48,8 @@ class DynamicShearTest(Document): False, 'auto_create_index': True, + "db_alias": + 'dblabtests', 'collection': 'sheartest', 'indexes': [ @@ -65,43 +67,83 @@ class DynamicShearTest(Document): class DynamicShearTestExtension(DynamicShearTest): #metadata - f = FloatField(required=True) + f_set = FloatField(required=True) sigma_normal = FloatField(required=True) - T = FloatField(required=True) + T_set = FloatField(required=True) extension = FloatField(required=True) - stiffness = FloatField(required=True) - bruch = BooleanField(required=True) + N_from = IntField() + N_to = IntField() + N_tot = IntField() + n_samples_per_cycle = IntField() + + G = FloatField(required=True) + broken = BooleanField(required=True) #fit parameter + ## required parameters + ## F - fit_amp_F = FloatField(required=True) - fit_freq_F = FloatField(required=True) - fit_phase_F = FloatField(required=True) - fit_offset_F = FloatField(required=True) - fit_slope_F = FloatField(required=True) + F_amp = FloatField(required=True) + F_freq = FloatField(required=True) + F_phase = FloatField(required=True) + F_offset = FloatField(required=True) + F_slope = FloatField(required=True) + F_r2 = FloatField(required=True) + F_max = FloatField(required=True) + F_min = FloatField(required=True) ## S1 - fit_amp_s_vert_1 = FloatField(required=True) - fit_freq_s_vert_1 = FloatField(required=True) - fit_phase_s_vert_1 = FloatField(required=True) - fit_offset_s_vert_1 = FloatField(required=True) - fit_slope_s_vert_1 = FloatField(required=True) - r2_s_vert_1 = FloatField(required=True) + s_vert_1_amp = FloatField(required=True) + s_vert_1_freq = FloatField(required=True) + s_vert_1_phase = FloatField(required=True) + s_vert_1_offset = FloatField(required=True) + s_vert_1_slope = FloatField(required=True) + s_vert_1_r2 = FloatField(required=True) + s_vert_1_max = FloatField(required=True) + s_vert_1_min = FloatField(required=True) ## S2 - fit_amp_s_vert_2 = FloatField(required=True) - fit_freq_s_vert_2 = FloatField(required=True) - fit_phase_s_vert_2 = FloatField(required=True) - fit_offset_s_vert_2 = FloatField(required=True) - fit_slope_s_vert_2 = FloatField(required=True) - r2_s_vert_2 = FloatField(required=True) - ## S-Sum - fit_amp_s_vert_sum = FloatField(required=True) - fit_freq_s_vert_sum = FloatField(required=True) - fit_phase_s_vert_sum = FloatField(required=True) - fit_offset_s_vert_sum = FloatField(required=True) - fit_slope_s_vert_sum = FloatField(required=True) - r2_s_vert_sum = FloatField(required=True) - ## r2 - r2_F = FloatField(required=True) - r2_s_vert_1 = FloatField(required=True) - r2_s_vert_2 = FloatField(required=True) - r2_s_vert_sum = FloatField(required=True) + s_vert_2_amp = FloatField(required=True) + s_vert_2_freq = FloatField(required=True) + s_vert_2_phase = FloatField(required=True) + s_vert_2_offset = FloatField(required=True) + s_vert_2_slope = FloatField(required=True) + s_vert_2_r2 = FloatField(required=True) + s_vert_2_max = FloatField(required=True) + s_vert_2_min = FloatField(required=True) + + ## optional parameters + + s_vert_sum_amp = FloatField(required=False) + s_vert_sum_freq = FloatField(required=False) + s_vert_sum_phase = FloatField(required=False) + s_vert_sum_offset = FloatField(required=False) + s_vert_sum_slope = FloatField(required=False) + s_vert_sum_r2 = FloatField(required=False) + s_vert_sum_max = FloatField(required=False) + s_vert_sum_min = FloatField(required=False) + + s_hor_sum_amp = FloatField(required=False) + s_hor_sum_freq = FloatField(required=False) + s_hor_sum_phase = FloatField(required=False) + s_hor_sum_offset = FloatField(required=False) + s_hor_sum_slope = FloatField(required=False) + s_hor_sum_r2 = FloatField(required=False) + s_hor_sum_max = FloatField(required=False) + s_hor_sum_min = FloatField(required=False) + + s_hor_1_amp = FloatField(required=False) + s_hor_1_freq = FloatField(required=False) + s_hor_1_phase = FloatField(required=False) + s_hor_1_offset = FloatField(required=False) + s_hor_1_slope = FloatField(required=False) + s_hor_1_r2 = FloatField(required=False) + s_hor_1_max = FloatField(required=False) + s_hor_1_min = FloatField(required=False) + ## S2 + s_hor_2_amp = FloatField(required=False) + s_hor_2_freq = FloatField(required=False) + s_hor_2_phase = FloatField(required=False) + s_hor_2_offset = FloatField(required=False) + s_hor_2_slope = FloatField(required=False) + s_hor_2_r2 = FloatField(required=False) + s_hor_2_max = FloatField(required=False) + s_hor_2_min = FloatField(required=False) \ No newline at end of file diff --git a/src/paveit/datamodels/usermanagement.py b/src/paveit/datamodels/usermanagement.py index cbf8773..75e8769 100644 --- a/src/paveit/datamodels/usermanagement.py +++ b/src/paveit/datamodels/usermanagement.py @@ -14,6 +14,7 @@ class Organisation(Document): }}) labtest_citt = StringField(required=False) + labtest_shear_extension = StringField(required=False) meta = { 'allow_inheritance': True, diff --git a/src/paveit/labtest/base.py b/src/paveit/labtest/base.py index 81f4a3e..b2db518 100644 --- a/src/paveit/labtest/base.py +++ b/src/paveit/labtest/base.py @@ -64,6 +64,8 @@ class DataSineLoad(): 'F', 's_hor_sum', 's_hor_1', 's_hor_2', 's_piston' ] + self.round_values = [('T', 3)] + # Header names after standardization; check if exists self.val_header_names = ['speciment_height', 'speciment_diameter'] @@ -197,7 +199,10 @@ class DataSineLoad(): def _post_apply_units(self): - for col in ['s_hor_sum', 's_hor_1', 's_hor_2']: + for col in [ + 's_hor_sum', 's_hor_1', 's_hor_2', 's_vert_sum', 's_vert_1', + 's_vert_2' + ]: if col in self.data.columns: self.data[col] = self.data[col].mul(self.unit_s) @@ -209,6 +214,13 @@ class DataSineLoad(): return True + def _post_round_values(self): + + for par, digits in self.round_values: + + if par in self.data.columns: + self.data[par] = self.data[par].round(digits) + def _post_select_importent_columns(self): # TODO: add more columns, check datamodel @@ -223,6 +235,10 @@ class DataSineLoad(): self.data['s_hor_sum'] = self.data[['s_hor_1', 's_hor_2']].sum(axis=1) + if not 's_vert_sum' in cols: + self.data['s_vert_sum'] = self.data[['s_vert_1', + 's_vert_2']].sum(axis=1) + def _post_opt_data(self): #set dtypes: for col in self.col_as_int: @@ -342,6 +358,7 @@ class DataSineLoad(): self._post_string_to_float() self._post_select_importent_columns() self._post_apply_units() + self._post_round_values() self._post_calc_missiong_values() self._post_opt_data() diff --git a/src/paveit/labtest/citt.py b/src/paveit/labtest/citt.py index 0318935..31fd963 100644 --- a/src/paveit/labtest/citt.py +++ b/src/paveit/labtest/citt.py @@ -133,6 +133,7 @@ class CITTBase(DataSineLoad): for idxcol, col in enumerate(self.columns_analyse): if not col in data.columns: continue + y = data[col].values res = fit_cos(x, y, freq=freq) @@ -744,3 +745,105 @@ class CITT_LaborHart(CITTBase): # log infos self._logger.info(self.metadata) self._logger.info(self.data.head()) + + +class CITT_BAGKoeln(CITTBase): + + def _define_units(self): + + self.unit_s = 1.0 #mm + self.unit_F = 1.0 #N + self.unit_t = 1. / 1000.0 #s + + def update_parameter(self): + + self.meta_names_of_parameter = { + 'sigma': ['Oberspannung'], + 'T': ['Solltemperatur'], + 't': ['TIME'], + 'speciment_diameter': ['Probendurchmesser'], + 'speciment_height': ['Probenhöhe'], + } #list of names + + self.data_column_names = { + 'time': ['TIME'], + 'f': ['FREQUENZ'], + 'F': ['Load'], + 's_hor_1': ['SENSOR 4'], + 's_hor_2': ['SENSOR Extension'], + 's_piston': ['Position'], + 'N': ['Impulsnummer'], + } + + def _process_data(self): + + meta = {} + + splitsign = ':;' + encoding = 'latin-1' + skiprows = 14 + + self.data.seek(0) + f = self.data.readlines() + + count = 0 + + for line in f: + count += 1 + + #remove whitespace + line = line.decode(encoding) + linesplit = line.strip() + linesplit = linesplit.split(splitsign) + + if len(linesplit) == 2: + + meta[linesplit[0]] = linesplit[1] + + if count >= skiprows: + break + + # data + self.data.seek(0) + + data = pd.read_csv(self.data, + encoding=encoding, + skiprows=skiprows, + decimal=',', + sep=';') + + ## add header to df + self.data.seek(0) + f = self.data.readlines() + count = 0 + + for line in f: + count += 1 + + if count >= skiprows: + break + + line = line.decode(encoding) + head = line.split(';') + data.columns = head + + # FIX: Sigma nicht in Metadaten oder Messdaten enthalten + sigma = float( + os.path.split(self.filename)[-1].split('MPa')[0].strip().replace( + ',', '.')) + + meta['sigma'] = sigma + + #clean data + data = data.dropna(axis=1) + + #remove whitespace + data.columns = [c.strip() for c in data.columns] + + #define in class + self.data = data + self.metadata.update(meta) + + # log infos + self._logger.info(self.metadata) + self._logger.info(self.data.head()) diff --git a/src/paveit/labtest/sheartest.py b/src/paveit/labtest/sheartest.py index 2727746..4e2a4e2 100644 --- a/src/paveit/labtest/sheartest.py +++ b/src/paveit/labtest/sheartest.py @@ -1,16 +1,15 @@ +import io +import logging import os import lmfit as lm import numpy as np import pandas as pd -from paveit_worker.libs.labtests.base import DataSineLoad - -#import scipy.fft as sfft - - -#from pytestpavement.labtests.base import DataSineLoad -#from pytestpavement.models.data import DataSheartest -#from pytestpavement.models.sheartest import DynamicShearTestExtension +from bson import ObjectId +from paveit import calc_nu, fit_cos +from paveit.datamodels import DataSheartest, DynamicShearTestExtension +from paveit.io import read_geosys +from paveit.labtest import DataSineLoad class ShearTest(DataSineLoad): @@ -19,256 +18,220 @@ class ShearTest(DataSineLoad): """ def __init__(self, - fname: str, + filename: str, + metadata: dict, + logger=None, debug: bool = False, gap_width: float = 1.0, roundtemperature: bool = True, - archive_file=False, - s3_params: dict = {}): + data: None | io.BytesIO = None): + + self.filename = filename + self.metadata = metadata - #set parameter self.gap_width = gap_width - self.debug = debug - self.file = fname + self.roundtemperature = roundtemperature - self.archive_file = archive_file - self.s3_params = s3_params - # process file - self._run() + if isinstance(data, io.BytesIO): + self.data = data - def plot_fited_data(self, opath=None, pkname=None, r2min=0.99): - - ylabel_dict = { - 'F': 'Kraft in N', - 's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm', - 's_piston': 'norm. Kolbenweg\n in mm', - 's_vert_1': 'Scherweg\n $S_1$ in mm', - 's_vert_2': 'Scherweg\n $S_2$ in mm' - } - - columns_analyse = [ - 'F', - 's_vert_sum', - 's_vert_1', - 's_vert_2', - 's_piston', - ] - - if not (opath is None) & (pkname is None): - showplot = False - - opath = os.path.join(opath, pkname, 'raw_data') - if not os.path.exists(opath): - os.makedirs(opath) + self.debug = debug + if logger == None: + self._logger = logging.getLogger(__name__) else: - showplot = True + self._logger = logger - for i, fit in self.fit.iterrows(): + self._logger.info( + f'filename s3: {self.filename}, metadata: {self.metadata}') - if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]): - continue + self._pre_run() - data = self.data[int(fit['idx_data'])] + def _sel_df(self, df, num=5, shift=-1): - if data is None: - continue + N = df['N'].unique() + n_N = len(N) + max_N = max(N) + min_N = min(N) - freq = data['f'].unique()[0] - sigma = data['sigma_normal'].unique()[0] - s = data['extension'].unique()[0] - T = data['T'].unique()[0] + # Fall 1: nur num Lastwechsel + if n_N < num - shift: + df_sel = None + elif n_N == num - shift: + df_sel = df - fig, axs = plt.subplots(len(columns_analyse), - 1, - figsize=(8, len(columns_analyse) * 2), - sharex=True) - - for idxcol, col in enumerate(columns_analyse): - x, y = data.index, data[col] - - #add fit - f = self.fit.iloc[i] - parfit = {} - for k in ['amp', 'freq', 'phase', 'offset', 'slope']: - parfit[k] = f[f'fit_{k}_{col}'] - - yreg = fit_cos_eval(x, parfit) - - if col in ['s_piston', 's_vert_sum']: - y = y - np.mean(y) - yreg = yreg - np.mean(yreg) - - plt.sca(axs[idxcol]) - plt.plot(x, y, label='Messdaten') - - r2 = np.round(f[f'r2_{col}'], 3) - plt.plot(x, - yreg, - alpha=0.7, - label=f'Regression ($R^2 = {r2}$)') - - if not ('F' in col): - s = f['extension'] - parline = dict(lw=0.4, - ls='--', - color='lightgrey', - alpha=0.4, - label='Bereich des zul. Scherweges') - plt.axhspan(-s, s, **parline) - - if idxcol == len(columns_analyse) - 1: - plt.xlabel('Zeit in s') - - plt.ylabel(ylabel_dict[col]) - plt.legend() - - plt.tight_layout() - - if showplot: - plt.show() - break - - else: - ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x') - ofile = os.path.join(opath, ofile + '.pdf') - - plt.savefig(ofile) - plt.close() - - -class ShearTestExtension(ShearTest): - - def runfit(self): - self._fit_data() - - def file_in_db(self): - - n = DynamicShearTestExtension.objects(filehash=self.filehash).count() - - if n > 0: - return True + # Fall 2: nicht alle LW in Datei else: - return False + df_sel = df[(df['N'] >= N[-num + shift]) + & (df['N'] <= N[-1 + shift])] - def save(self, material1, material2, bounding, meta: dict): - - for i, fit in self.fit.iterrows(): - - data = self.data[int(fit['idx_data'])] - - #check if data in db - n = DynamicShearTestExtension.objects( - f=fit['f'], - sigma_normal=fit['sigma_normal'], - T=fit['T'], - extension=fit['extension'], - material1=material1, - material2=material2, - bounding=bounding, - filehash=self.filehash, - ).count() - if n > 0: continue - - # save fit - - values = {} - for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']: - values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}'] - values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}'] - values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}'] - values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}'] - values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}'] - values[f'r2_{col}'] = fit[f'r2_{col}'] - - values.update(meta) - - try: - r = DynamicShearTestExtension( - #metadata - f=fit['f'], - sigma_normal=fit['sigma_normal'], - T=fit['T'], - extension=fit['extension'], - filehash=self.filehash, - material1=material1, - material2=material2, - bounding=bounding, - #results - stiffness=fit['G'], - # - **values).save() - - #save raw data - rdata = DataSheartest( - result_id=r.id, - time=data.index.values, - F=data['F'].values, - N=data['N'].values, - s_vert_1=data['s_vert_1'].values, - s_vert_2=data['s_vert_2'].values, - s_vert_sum=data['s_vert_sum'].values, - s_piston=data['s_piston'].values, - ).save() - - except: - print('error saving data') - raise - rdata.delete() - - if self.archive_file: - mclient = MinioClient(self.s3_params['S3_URL'], - self.s3_params['S3_ACCESS_KEY'], - self.s3_params['S3_SECRET_KEY'], - bucket=str(meta['org_id'])) - - extension = os.path.splitext(self.file)[-1] - ofilename = self.filehash + extension - outpath = 'sheartest' - - metadata_s3 = { - 'project_id': str(meta['project_id']), - 'user_id': str(meta['user_id']), - 'filename': os.path.split(self.file)[-1], - 'speciment': meta['speciment_name'] - } - - mclient.compress_and_upload_file(self.file, - ofilename, - outpath=outpath, - content_type="application/raw", - metadata=metadata_s3) + return df_sel def _set_parameter(self): + self._logger.debug('run _set_parameter') self.split_data_based_on_parameter = [ 'T', 'sigma_normal', 'f', 'extension' ] self.col_as_int = ['N'] - self.col_as_float = ['T', 'F', 'f', 's_vert_sum'] + self.col_as_float = ['T', 'F', 's_piston', 's_hor_1', 's_hor_2'] - self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum'] - # Header names after standardization; check if exists - self.val_header_names = ['speciment_diameter'] + self.val_col_names = [ + 'time', 'T', 'f', 'sigma_normal', 'extension', 'N', 'F', 's_hor_1', + 's_hor_2', 's_vert_1', 's_vert_2' + ] + + self.round_values = [('T', 1), ('sigma_normal', 1), ('f', 1)] self.columns_analyse = [ - 'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston' + 'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_hor_1', 's_hor_2', + 's_hor_sum', 's_piston' + ] + + # Header names after standardization; check if exists + self.val_header_names = [ + 'speciment_height', 'speciment_diameter', 'broken' ] self.number_of_load_cycles_for_analysis = 5 - def _calc_missiong_values(self): + self.meta_names_of_parameter = {} #list of names - cols = self.data.columns + self.data_column_names = { + 'time': ['Time Series'], + 'F': ['Load Series'], + 's_hor_1': ['LVDT1 Series'], + 's_hor_2': ['LVDT2 Series'], + } - for c in ['vert']: - if not f's_{c}_sum' in cols: - self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2' - ]].sum(axis=1).div(2.0) - def _fit_data(self): +class ShearTestExtension(ShearTest): + + def save( + self, + org_id: ObjectId, + project_id: ObjectId, + material_1_id: ObjectId, + material_2_id: ObjectId, + material_boundary_id: ObjectId, + user_id: ObjectId, + meta: dict = {}, + wp_id: ObjectId | None = None, + broken: bool = False, + gap_width: float = 1.0, #mm + ): + """ + save results to mongodb + """ + + if not hasattr(self, 'fit'): + raise + + # precheck data and results + # assert len(self.data) == len(self.fit) + + for idx_fit, fit in self.fit.iterrows(): + data = self.data[idx_fit] + + meta['filehash'] = self.filehash + meta['org_id'] = org_id + meta['project_id'] = project_id + meta['workpackage_id'] = wp_id + meta['user_id'] = user_id + + meta['material1'] = material_1_id + meta['material2'] = material_2_id + meta['bounding'] = material_boundary_id + + #check if result in db + #n = CITTSiffness.objects(**meta).count() + #print(n) + + # write data + data_dict = fit.to_dict() + data_dict.update(meta) + + # remove 'fit_' from keys: + for key in list(data_dict.keys()): + if key.startswith('fit_'): + data_dict[key[4:]] = data_dict[key] + data_dict.pop(key) + + # rename fields + + def rename_field(d, old, new): + d[new] = d[old] + d.pop(old) + + f = DynamicShearTestExtension(**data_dict).save() + + # required data + data_out = dict( + time=data.index, + F=list(data['F']), + N=list(data['N']), + s_vert_1=list(data['s_hor_1']), + s_vert_2=list(data['s_hor_2']), + s_vert_sum=list(data['s_hor_sum']), + ) + + # add optional datas + for col in ['s_piston', 's_hor_1', 's_hor_2']: + if col in data.columns: + data_out[col] = list(data[col]) + + g = DataSheartest(result=f.id, **data_out).save() + + def _fit_select_data(self): + """ + select N load cycles from original data + (a): Based on window of TP-Asphalt + (b) last N cycles + + """ + + self._logger.debug('run _fit_select_data') + + self.max_N_in_data = [] + + if not isinstance(self.data, list): + if self.number_of_load_cycles_for_analysis > 1: + + self.max_N_in_data.append(self.data['N'].max()) + + df_sel = [ + self._sel_df(self.data, + num=self.number_of_load_cycles_for_analysis) + ] + else: + df_sel = [self.data] + + else: + df_sel = [] + for d in self.data: + + self.max_N_in_data.append(d['N'].max()) + + if self.number_of_load_cycles_for_analysis > 1: + d_sel = self._sel_df( + d, num=self.number_of_load_cycles_for_analysis) + else: + d_sel = d + + df_sel.append(d_sel) + + # replace data + self.data = df_sel + + def _calc(self): + """ + Calculate Results + """ + + self._logger.info('run _calc base') + print('run BASE') self.fit = [] @@ -278,406 +241,166 @@ class ShearTestExtension(ShearTest): data.index = data.index - data.index[0] - res = {} - res['idx_data'] = int(idx_data) + res_temp = {} + res_temp['idx'] = idx_data # Fitting - freq = float(np.round(data['f'].mean(), 4)) - if (self.debug): - sigma_normal = np.round(data['sigma_normal'].mean(), 3) - T = np.round(data['T'].mean(), 3) + freq = data['f'].mean() + sigma_normal = data['sigma_normal'].mean() + T = data['T'].mean() + extension = data['extension'].mean() + + x = data.index.values for idxcol, col in enumerate(self.columns_analyse): if not col in data.columns: continue - x = data.index.values y = data[col].values - # Fourier Transformation - """ - dt = np.diff(x).mean() #mean sampling rate - n = len(x) + res = fit_cos(x, y, freq=freq) - res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT - res[f'freq_{col}'] = sfft.rfftfreq(n, dt) - """ + for key, value in res.items(): + res_temp[f'fit_{col}_{key}'] = value - res_fit = fit_cos(x, y, freq=freq, constfreq=True) + res_temp[f'fit_{col}_max'] = max(y) + res_temp[f'fit_{col}_min'] = min(y) - res[f'r2_{col}'] = res_fit['r2'] + # add more metadata + res_temp['f_set'] = freq + res_temp['sigma_normal'] = sigma_normal + res_temp['T_set'] = T + res_temp['extension'] = extension + res_temp['broken'] = self.metadata['broken'] - res[f'fit_amp_{col}'] = res_fit['amp'] - res[f'fit_freq_{col}'] = res_fit['freq'] - res[f'fit_phase_{col}'] = res_fit['phase'] - res[f'fit_offset_{col}'] = res_fit['offset'] - res[f'fit_slope_{col}'] = res_fit['slope'] + res_temp['N_from'] = int(data['N'].min()) + res_temp['N_to'] = int(data['N'].max()) + res_temp['N_tot'] = int(self.max_N_in_data[idx_data]) + + res_temp['n_samples_per_cycle'] = int( + len(data) / (res_temp['N_to'] - res_temp['N_from'] + 1)) ## Schersteifigkeit berechnen - deltaF = res['fit_amp_F'] - deltaS = res['fit_amp_s_vert_sum'] + deltaF = res_temp['fit_F_amp'] + deltaS = res_temp['fit_s_vert_sum_amp'] - A = np.pi * self.meta['speciment_diameter']**2 / 4 + A = np.pi * self.metadata['speciment_diameter']**2 / 4 tau = deltaF / A gamma = deltaS / self.gap_width - res['G'] = tau / gamma + res_temp['G'] = tau / gamma #metadaten - for c in ['T', 'extension', 'sigma_normal', 'f']: - res[c] = data[c][0] + #for c in ['T', 'extension', 'sigma_normal', 'f']: + # res_temp[c] = res_temp[c][0] - self.fit.append(res) + self.fit.append(res_temp) if (self.debug) & (len(self.fit) > 5): break self.fit = pd.DataFrame.from_records(self.fit) - def plot_results(self, opath=None, pkname=None, r2min=0.96): - if not (opath is None) & (pkname is None): - showplot = False + self.fit = self.fit.reset_index(drop=True).set_index('idx') - opath = os.path.join(opath, pkname) - if not os.path.exists(opath): - os.makedirs(opath) - else: - showplot = True - - dfplot = self.fit.copy() - for col in ['extension', 'fit_amp_s_vert_sum']: - dfplot[col] = dfplot[col].mul(1000) - - fig, ax = plt.subplots() - - xticks = list(dfplot['extension'].unique()) - - df = dfplot - df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)] - - sns.scatterplot( - data=df, - x='fit_amp_s_vert_sum', - y='G', - hue='T', - ax=ax, - alpha=0.7, - #size=150, - size="G", - sizes=(50, 160), - edgecolor='k', - palette='muted', - zorder=10) - - df = dfplot - df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)] - - if not df.empty: - sns.scatterplot(data=df, - x='fit_amp_s_vert_sum', - y='G', - facecolor='grey', - alpha=0.5, - legend=False, - zorder=1, - ax=ax) - - ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$') - ax.set_ylabel(r'Scherseteifigkeit in MPa/mm') - - ax.set_xticks(xticks) - ax.grid() - - if not showplot: - ofile = os.path.join(opath, 'shearstiffness.pdf') - - plt.savefig(ofile) - plt.show() - - def plot_stats(self, opath=None, pkname=None, r2min=0.96): - if not (opath is None) & (pkname is None): - showplot = False - - opath = os.path.join(opath, pkname) - if not os.path.exists(opath): - os.makedirs(opath) - else: - showplot = True - - dfplot = self.fit.copy() - for col in ['extension', 'fit_amp_s_vert_sum']: - dfplot[col] = dfplot[col].mul(1000) - - #r2 - - df = self.fit - - fig, axs = plt.subplots(1, 2, sharey=True, sharex=True) - - parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3) - - # r2 - ax = axs[0] - sns.scatterplot(data=df, - x='fit_amp_s_vert_sum', - y='r2_F', - hue='T', - ax=ax, - **parscatter) - ax.set_ylabel('Bestimmtheitsmaß $R^2$') - ax.set_title('Kraft') - - ax = axs[1] - sns.scatterplot(data=df, - x='fit_amp_s_vert_sum', - y='r2_s_vert_sum', - hue='T', - legend=False, - ax=ax, - **parscatter) - ax.set_ylabel('$R^2$ (S_{mittel})') - ax.set_title('mittlerer Scherweg') - - for ax in axs.flatten(): - ax.grid() - ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$') - - plt.tight_layout() - - if not showplot: - ofile = os.path.join(opath, 'stats_r2.pdf') - plt.savefig(ofile) - plt.show() + nsamples = len(self.fit) + self._logger.info(f'fitting finished, add {nsamples} samples') class ShearTestExtensionLaborHart(ShearTestExtension): def _define_units(self): - self.unit_F = 1 / 1000.0 #N - self.unit_t = 1 / 1000. #s + self.unit_s = 1.0 #mm + self.unit_F = 1.0 #N + self.unit_t = 1. / 1000.0 #s - def _set_units(self): + def update_parameter(self): - #for col in ['F']: - # self.data[col] = self.data[col].mul(self.unit_F) + self.meta_names_of_parameter = { + 'T': ['Solltemperatur'], + 't': ['TIME'], + 'speciment_diameter': ['Probendurchmesser'], + 'speciment_height': ['Probenhöhe'], + } #list of names - for col in ['time']: - self.data[col] = self.data[col].mul(self.unit_t) + self.data_column_names = { + 'time': ['TIME'], + 'f': ['Sollwert Frequenz'], + 'T': ['SollTemperatur'], + 'sigma_normal': ['Sollwert Normalspannung'], + 'extension': ['Max Scherweg'], + 'F': ['Load'], + 's_hor_1': ['HORIZONTAL links'], + 's_hor_2': ['HOIZONTAL Rechts'], + 's_vert_1': ['VERTIKAL Links'], + 's_vert_2': ['VERTIKAL Rechts'], + 's_piston': ['Position'], + 'N': ['Impulsnummer'], + } - return True + def _process_data(self): - def _read_data(self): - """ - read data from Labor Hart - """ - - # parameter - encoding = 'latin-1' - skiprows = 14 - hasunits = True - splitsign = ':;' - - # metadata from file meta = {} - with open(self.file, 'r', encoding=encoding) as f: - count = 0 + splitsign = ':;' + encoding = 'latin-1' + skiprows = 14 - for line in f: - count += 1 + self.data.seek(0) + f = self.data.readlines() - #remove whitespace - linesplit = line.strip() - linesplit = linesplit.split(splitsign) + count = 0 - if len(linesplit) == 2: + for line in f: + count += 1 - meta[linesplit[0]] = linesplit[1] + #remove whitespace + line = line.decode(encoding) + linesplit = line.strip() + linesplit = linesplit.split(splitsign) - if count >= skiprows: - break + if len(linesplit) == 2: + + meta[linesplit[0]] = linesplit[1] + + if count >= skiprows: + break # data - data = pd.read_csv(self.file, + self.data.seek(0) + + data = pd.read_csv(self.data, encoding=encoding, skiprows=skiprows, decimal=',', sep=';') ## add header to df - with open(self.file, 'r', encoding=encoding) as f: - count = 0 + self.data.seek(0) + f = self.data.readlines() + count = 0 - for line in f: - count += 1 + for line in f: + count += 1 - if count >= skiprows: - break + if count >= skiprows: + break + line = line.decode(encoding) head = line.split(';') data.columns = head #clean data data = data.dropna(axis=1) - #define in class - self.meta = meta - self.data = data - return True - - def _standardize_meta(self): - - keys = list(self.meta.keys()) - for key in keys: - - if any(map(key.__contains__, ['Probenbezeichnung'])): - self.meta['speciment'] = self.meta.pop(key) - - elif any(map(key.__contains__, ['Datum/Uhrzeit'])): - self.meta['datetime'] = self.meta.pop(key) - try: - self.meta['datetime'] = pd.to_datetime( - self.meta['datetime']) - except: - pass - - elif any(map(key.__contains__, ['Probenhöhe'])): - self.meta['speciment_height'] = float( - self.meta.pop(key).replace(',', '.')) - elif any(map(key.__contains__, ['Probendurchmesser'])): - self.meta['speciment_diameter'] = float( - self.meta.pop(key).replace(',', '.')) - elif any(map(key.__contains__, ['Solltemperatur'])): - self.meta['temperature'] = float( - self.meta.pop(key).replace(',', '.')) - elif any(map(key.__contains__, ['Prüfbedingungen'])): - self.meta['test_version'] = self.meta.pop(key) - elif any(map(key.__contains__, ['Name des VersAblf'])): - self.meta['test'] = self.meta.pop(key) - elif any(map(key.__contains__, ['Prüfer'])): - self.meta['examiner'] = self.meta.pop(key) - - return True - - def _standardize_data(self): - - colnames = list(self.data.columns) - - for i, col in enumerate(colnames): - if col == 'TIME': - colnames[i] = 'time' - - #set values - elif col == 'Sollwert Frequenz': - colnames[i] = 'f' - elif col == 'SollTemperatur': - colnames[i] = 'T' - elif col == 'Max Scherweg': - colnames[i] = 'extension' - elif col == 'Sollwert Normalspannung': - colnames[i] = 'sigma_normal' - elif col == 'Impulsnummer': - colnames[i] = 'N' - - # measurements - - elif col == 'Load': - colnames[i] = 'F' - elif col == 'Position': - colnames[i] = 's_piston' - - elif col == 'VERTIKAL Links': - colnames[i] = 's_vert_1' - elif col == 'VERTIKAL Rechts': - colnames[i] = 's_vert_2' - - elif col == 'HORIZONTAL links': - colnames[i] = 's_hor_1' - - elif col == 'HOIZONTAL Rechts': - colnames[i] = 's_hor_2' - - self.data.columns = colnames - - -class ShearTestExtensionTUDresdenGeosys(ShearTestExtension): - - def _define_units(self): - - self.unit_S = 1 / 1000.0 #N - - def _set_units(self): - - for col in [ - 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension' - ]: - self.data[col] = self.data[col].mul(self.unit_S) - - #convert internal units to global - f = np.mean([0.9 / 355, 0.6 / 234.0, 0.3 / 116.0]) - - self.data['sigma_normal'] = self.data['sigma_normal'].mul(f).apply( - lambda x: np.round(x, 1)) - - return True - - def _read_data(self): - """ - read data from Labor Hart - """ - - # parameter - encoding = 'latin-1' - skiprows = 14 - hasunits = True - splitsign = ':;' - - head, data = read_geosys(self.file, '015') + #remove whitespace + data.columns = [c.strip() for c in data.columns] #define in class - self.meta = head self.data = data - return True + self.metadata.update(meta) - def _standardize_meta(self): - - keys = list(self.meta.keys()) - for key in keys: - - if key == 'd': - self.meta['speciment_diameter'] = self.meta.pop(key) - - return True - - def _standardize_data(self): - - colnames = list(self.data.columns) - - for i, col in enumerate(colnames): - - #set values - if col == 'soll temperature': - colnames[i] = 'T' - elif col == 'soll extension': - colnames[i] = 'extension' - elif col == 'soll sigma': - colnames[i] = 'sigma_normal' - elif col == 'soll frequency': - colnames[i] = 'f' - - elif col == 'Number of vertical cycles': - colnames[i] = 'N' - - # measurements - elif col == 'vertical load from hydraulic pressure': - colnames[i] = 'F' - elif col == 'vertical position from hydraulic pressure': - colnames[i] = 's_piston' - - elif col == 'Vertical position from LVDT 1': - colnames[i] = 's_vert_1' - elif col == 'Vertical position from LVDT 2': - colnames[i] = 's_vert_2' - - self.data.columns = colnames + # log infos + self._logger.info(self.metadata) + self._logger.info(self.data.head()) \ No newline at end of file