init package and add first subpackages

This commit is contained in:
Markus Clauß
2023-02-27 17:07:04 +01:00
commit 1b4ce18eca
16 changed files with 1658 additions and 0 deletions

4
src/paveit/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
# main __init__.py
from .analysis import *
from .helper import *
from .labtest import *

View File

@@ -0,0 +1,9 @@
from .regression import *
__all__ = [
# regession models
"fit_cos_simple",
"fit_cos",
#helper functions
"fit_cos_eval",
]

View File

@@ -0,0 +1,159 @@
import lmfit as lm
import numpy as np
import scipy.special as sf
from scipy.optimize import curve_fit
def cosfunc(t, A, w, p, c, e):
"""
definition of the sin function
"""
return A * np.cos(2 * np.pi * w * t + p) + e * t + c
def fit_cos_eval(x, par):
"""
par: dict
fitting results
"""
ys = cosfunc(x, par['amp'], par['freq'], par['phase'], par['offset'],
par['slope'])
return ys
def regression_sine_fft():
"""
fast fourier transformation for sine data
"""
return []
def fit_cos_simple(x, y, freq=10.0):
"""
simple sine regression
x: vector or list
y: vector or list
freq: float
initial frequency of regression analysis
"""
tt = np.array(x)
yy = np.array(y)
guess_offset = np.mean(yy)
offset_b = 0.4 * abs(guess_offset)
guess_amp = abs(np.max(yy) - np.min(yy)) / 2.0
param_bounds = ([
0.3 * guess_amp, 0.99 * freq, -np.inf, guess_offset - offset_b, -np.inf
], [1.3 * guess_amp, 1.01 * freq, np.inf, guess_offset + offset_b, np.inf])
popt, pcov = curve_fit(cosfunc, tt, yy, bounds=param_bounds)
A, w, p, c, e = popt
return {
"amp": A,
"freq": w,
"phase": p,
"offset": c,
"slope": e,
}
def fit_cos(x, y, freq=10.0, constfreq=False):
"""
sine regression
x: vector or list
y: vector or list
freq: float
initial frequency of regression analysis
"""
# step 1
res_step1 = fit_cos_simple(x, y, freq=freq)
# step 2: lmfit
mod = lm.models.Model(cosfunc)
mod.set_param_hint(
'A',
value=res_step1['amp'],
#min=res_step1['amp'] - 0.5 * abs(res_step1['amp']),
#max=res_step1['amp'] + 0.5 * abs(res_step1['amp'])
)
mod.set_param_hint(
'w',
value=freq,
vary=not constfreq,
#min=freq - 0.1 * freq,
#max=freq + 0.1 * freq,
)
mod.set_param_hint('p', value=res_step1['phase'], vary=True)
mod.set_param_hint('c', value=res_step1['offset'],
vary=True) #, min = -0.5, max = 0.5)
mod.set_param_hint('e', value=res_step1['slope'], vary=True)
parms_fit = [
mod.param_hints['A']['value'], mod.param_hints['w']['value'],
mod.param_hints['p']['value'], mod.param_hints['c']['value'],
mod.param_hints['e']['value']
]
abweichung = []
chis = []
chis_red = []
results = []
r2 = []
methods = ['leastsq', 'powell']
dof = len(y) - len(parms_fit)
for method in methods:
result = mod.fit(y, t=x, method=method, verbose=False)
r2temp = 1 - result.residual.var() / np.var(y)
# r2temp = result.redchi / np.var(yfit, ddof=2)
if r2temp < 0.:
r2temp = 0
r2.append(r2temp)
chi = result.chisqr
chis_red.append(result.redchi)
abweichung.append(sf.gammaincc(dof / 2., chi / 2))
chis.append(chi)
results.append(result)
res = {}
best = np.nanargmax(r2)
res[f'amp'] = results[best].best_values['A']
res[f'freq'] = results[best].best_values['w']
res[f'phase'] = results[best].best_values['p']
res[f'offset'] = results[best].best_values['c']
res[f'slope'] = results[best].best_values['e']
res[f'r2'] = r2[best]
return res

View File

@@ -0,0 +1,6 @@
from .filehasher import calc_hash_of_bytes
from .minio import get_minio_client_archive, get_minio_client_processing
__all__ = ['get_minio_client_archive', 'get_minio_client_processing',
'calc_hash_of_bytes'
]

View File

@@ -0,0 +1,22 @@
import hashlib
from io import BytesIO
def calc_hash_of_bytes(buf: BytesIO):
""" calculate the hash of the file """
algo = hashlib.sha1()
buffer_size = 65536
buffer_size = buffer_size * 1024 * 1024
while True:
data = buf.read(buffer_size)
if not data:
break
algo.update(data)
hex = algo.hexdigest()
return hex

View File

@@ -0,0 +1,40 @@
import os
from minio import Minio
def get_minio_client_processing(bucket_name = 'processing'):
client = Minio(
os.environ["MINIO_URL"],
access_key=os.environ["MINIO_ACCESS_KEY"],
secret_key=os.environ["MINIO_SECRET_KEY"],
secure=False
)
found = client.bucket_exists(bucket_name)
if not found:
client.make_bucket(bucket_name)
else:
pass
return client
def get_minio_client_archive(bucket_name = 'archive'):
client = Minio(
os.environ["MINIO_ARCHIVE_URL"],
access_key=os.environ["MINIO_ARCHIVE_ACCESS_KEY"],
secret_key=os.environ["MINIO_ARCHIVE_SECRET_KEY"],
secure=False
)
found = client.bucket_exists(bucket_name)
if not found:
client.make_bucket(bucket_name)
else:
pass
return client

View File

@@ -0,0 +1,6 @@
from .base import DataSineLoad
from .citt import CITTBase
__all__ = ['DataSineLoad',
'CITTBase'
]

View File

@@ -0,0 +1,90 @@
# coding: utf-8
import io
import pandas as pd
from paveit.helper import calc_hash_of_bytes, get_minio_client_processing
from worker import app, logger
class DataSineLoad():
"""
Base class for lab tests with sine load
"""
def __init__(self, filename:str , metadata: dict):
self.filename = filename
self.metadata = metadata
self._logger = logger
self._logger.info(f'filename s3: {self.filename}, metadata: {self.metadata}')
def _connect_to_s3(self):
self._logger.info('connect to db')
self.__minioClient = get_minio_client_processing()
def _read_from_s3_to_bytesio(self):
self._logger.info('read bytes')
try:
self._connect_to_s3()
response = self.__minioClient.get_object('processing', self.filename)
self.data = response.data
finally:
response.close()
response.release_conn()
self.data = io.BytesIO(self.data)
def _calc_hash_of_bytesio(self):
self.filehash = calc_hash_of_bytes(self.data)
self.data.seek(0)
self._logger.debug(f'Hash of file: {self.filehash}')
def _bytes_to_df(self):
self._logger.debug('convert bytes to pandas.DataFrame')
encoding='utf-8'
self.df = pd.read_csv(self.data, encoding=encoding)
def _calc(self):
self._logger.debug('calc data')
return self.df.mean().mean()
def _archive_binary_data(self):
self._logger.debug('send file to archive')
app.send_task('ArchiveFile', args=[self.filename,
self.metadata,
self.filehash,
'org',
'citt'
],
queue='archive'
)
def run(self):
self._logger.info('run task')
self._read_from_s3_to_bytesio()
self._calc_hash_of_bytesio()
self._bytes_to_df()
res = self._calc()
self._logger.debug(f'results: {res}')
self._archive_binary_data()
return res

192
src/paveit/labtest/citt.py Normal file
View File

@@ -0,0 +1,192 @@
import io
import os
from csv import reader
import numpy as np
import pandas as pd
from paveit.labtest import DataSineLoad
class CITTBase(DataSineLoad):
def _calc(self):
return (self.df.mean().mean(), self.df.max().max())
class CITT_KIT(DataSineLoad):
def _calc(self):
return (self.df.mean().mean(), self.df.max().max())
def _bytes_to_df(self):
logger.debug('convert bytes to pandas.DataFrame')
self.data.seek(0)
with io.TextIOWrapper(self.data, encoding='latin-1') as read_obj:
csv_reader = reader(read_obj, delimiter=';')
read = False
data = []
temp = []
for idx_row, row in enumerate(csv_reader):
if row == ['*****']:
if read == False:
read = True
else:
read = False
data.append(temp)
temp = []
continue
if read:
row = [r.replace(',', '.') for r in row]
temp.append(row)
#convert to pandas
res = []
freqs = [10.0, 5.0, 1.0, 0.1, 10.0]
for idx_data, d in enumerate(data):
t = pd.DataFrame(d[3:])
t.columns = d[1]
freq = freqs[idx_data]
t['f'] = freq
for col in t.columns:
t[col] = pd.to_numeric(t[col])
# add cycle number
dt = 1. / freq
Nmax = int(np.ceil(t['ZEIT'].max() / dt))
N = np.zeros_like(t['ZEIT'])
for i in range(Nmax):
if i == 0:
tmin = 0
tmax = dt
else:
tmax = (i + 1) * dt
tmin = (i) * dt
idx = t[(t['ZEIT'] >= tmin) & (t['ZEIT'] < tmax)].index
N[idx] = i
t['N'] = N
res.append(t)
#remove second 10 Hz
res = pd.concat(res[:-1])
res['T'] = self.temperature
#res = res.sort_values(['f', 'ZEIT'])
#define in class
self.df = res.reset_index()
class CITT_PTMDortmund(DataSineLoad):
def _calc(self):
return (self.df.mean().mean(), self.df.max().max())
def _bytes_to_df(self):
res = []
xl = pd.ExcelFile(self.data)
num_sheets = len(xl.sheet_names)
print(num_sheets)
diameter = []
height = []
for sheetid in range(num_sheets):
temp = pd.read_excel(self.data, sheetid, skiprows=97)
temp = temp.drop(index=0)
#convert data to numerical data
for col in temp.columns:
temp[col] = pd.to_numeric(temp[col])
#read metadata from file
meta = pd.read_excel(self.data, sheetid,
skiprows=1,
nrows=90)
meta = meta[meta.columns[[0, 2]]]
meta = meta.set_index(
meta.columns[0]).to_dict()[meta.columns[1]]
temp['sigma'] = float(meta['Max. Spannung'])
temp['T'] = float(meta['Versuchstemperatur'])
freq = float(meta['Frequenz'])
dt = 1 / freq
temp['f'] = freq
Nfrom = int(meta['Erster Aufzeichnungslastwechsel'])
Nto = int(meta['Letzer Aufzeichnungslastwechsel'])
#add cycle number to dataframe
time_idx = temp['Zeitfolgen'].values
N = np.zeros_like(time_idx)
self._logger.debug(len(N))
self._logger.info(f'cycles from {Nfrom} to {Nto}')
#BUG: Ist in Messdatei falsch definiert und wird von PTM angepasst. '''
#for cycle in range(Nfrom, Nto+1):
for cycle in range(10):
# time window
tmin = (cycle) * dt
tmax = (cycle + 1) * dt
#filter data
idx = temp[(time_idx >= tmin)
& (time_idx < tmax)].index
#FIX: siehe bug oben
if any(idx)>=500:
idx = idx[idx<500]
#set cycle number
N[idx] = cycle
temp['N'] = N
# add diameter and height to list
diameter.append(float(meta['Durchmesser (mm)']))
height.append(float(meta['Länge (mm)']))
#append data to final dataframe
res.append(temp)
#concat all parts to single dataframe
res = pd.concat(res)
# add data from speciment to metadata
if not 'diameter' in self.metadata:
self.metadata['diameter'] = np.mean(diameter)
if not 'height' in self.metadata:
self.metadata['height'] = np.mean(height)
#define in class
self.df = res.reset_index()
# log infos
logger.debug(self.metadata)
logger.debug(self.df.head())

View File

@@ -0,0 +1,683 @@
import os
import lmfit as lm
import numpy as np
import pandas as pd
from paveit_worker.libs.labtests.base import DataSineLoad
#import scipy.fft as sfft
#from pytestpavement.labtests.base import DataSineLoad
#from pytestpavement.models.data import DataSheartest
#from pytestpavement.models.sheartest import DynamicShearTestExtension
class ShearTest(DataSineLoad):
"""
Dynamic Shear Bounding Test
"""
def __init__(self,
fname: str,
debug: bool = False,
gap_width: float = 1.0,
roundtemperature: bool = True,
archive_file=False,
s3_params: dict = {}):
#set parameter
self.gap_width = gap_width
self.debug = debug
self.file = fname
self.roundtemperature = roundtemperature
self.archive_file = archive_file
self.s3_params = s3_params
# process file
self._run()
def plot_fited_data(self, opath=None, pkname=None, r2min=0.99):
ylabel_dict = {
'F': 'Kraft in N',
's_vert_sum': 'norm. mittlerer Scherweg\n $S_{mittel}$ in mm',
's_piston': 'norm. Kolbenweg\n in mm',
's_vert_1': 'Scherweg\n $S_1$ in mm',
's_vert_2': 'Scherweg\n $S_2$ in mm'
}
columns_analyse = [
'F',
's_vert_sum',
's_vert_1',
's_vert_2',
's_piston',
]
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname, 'raw_data')
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
for i, fit in self.fit.iterrows():
if not any([fit['r2_F'] < r2min, fit['r2_s_vert_sum'] < r2min]):
continue
data = self.data[int(fit['idx_data'])]
if data is None:
continue
freq = data['f'].unique()[0]
sigma = data['sigma_normal'].unique()[0]
s = data['extension'].unique()[0]
T = data['T'].unique()[0]
fig, axs = plt.subplots(len(columns_analyse),
1,
figsize=(8, len(columns_analyse) * 2),
sharex=True)
for idxcol, col in enumerate(columns_analyse):
x, y = data.index, data[col]
#add fit
f = self.fit.iloc[i]
parfit = {}
for k in ['amp', 'freq', 'phase', 'offset', 'slope']:
parfit[k] = f[f'fit_{k}_{col}']
yreg = fit_cos_eval(x, parfit)
if col in ['s_piston', 's_vert_sum']:
y = y - np.mean(y)
yreg = yreg - np.mean(yreg)
plt.sca(axs[idxcol])
plt.plot(x, y, label='Messdaten')
r2 = np.round(f[f'r2_{col}'], 3)
plt.plot(x,
yreg,
alpha=0.7,
label=f'Regression ($R^2 = {r2}$)')
if not ('F' in col):
s = f['extension']
parline = dict(lw=0.4,
ls='--',
color='lightgrey',
alpha=0.4,
label='Bereich des zul. Scherweges')
plt.axhspan(-s, s, **parline)
if idxcol == len(columns_analyse) - 1:
plt.xlabel('Zeit in s')
plt.ylabel(ylabel_dict[col])
plt.legend()
plt.tight_layout()
if showplot:
plt.show()
break
else:
ofile = f'{T}deg_{sigma}MPa_{freq}Hz_{s}mm'.replace('.', 'x')
ofile = os.path.join(opath, ofile + '.pdf')
plt.savefig(ofile)
plt.close()
class ShearTestExtension(ShearTest):
def runfit(self):
self._fit_data()
def file_in_db(self):
n = DynamicShearTestExtension.objects(filehash=self.filehash).count()
if n > 0:
return True
else:
return False
def save(self, material1, material2, bounding, meta: dict):
for i, fit in self.fit.iterrows():
data = self.data[int(fit['idx_data'])]
#check if data in db
n = DynamicShearTestExtension.objects(
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
material1=material1,
material2=material2,
bounding=bounding,
filehash=self.filehash,
).count()
if n > 0: continue
# save fit
values = {}
for col in ['F', 's_vert_1', 's_vert_2', 's_vert_sum']:
values[f'fit_amp_{col}'] = fit[f'fit_amp_{col}']
values[f'fit_freq_{col}'] = fit[f'fit_freq_{col}']
values[f'fit_phase_{col}'] = fit[f'fit_phase_{col}']
values[f'fit_offset_{col}'] = fit[f'fit_offset_{col}']
values[f'fit_slope_{col}'] = fit[f'fit_slope_{col}']
values[f'r2_{col}'] = fit[f'r2_{col}']
values.update(meta)
try:
r = DynamicShearTestExtension(
#metadata
f=fit['f'],
sigma_normal=fit['sigma_normal'],
T=fit['T'],
extension=fit['extension'],
filehash=self.filehash,
material1=material1,
material2=material2,
bounding=bounding,
#results
stiffness=fit['G'],
#
**values).save()
#save raw data
rdata = DataSheartest(
result_id=r.id,
time=data.index.values,
F=data['F'].values,
N=data['N'].values,
s_vert_1=data['s_vert_1'].values,
s_vert_2=data['s_vert_2'].values,
s_vert_sum=data['s_vert_sum'].values,
s_piston=data['s_piston'].values,
).save()
except:
print('error saving data')
raise
rdata.delete()
if self.archive_file:
mclient = MinioClient(self.s3_params['S3_URL'],
self.s3_params['S3_ACCESS_KEY'],
self.s3_params['S3_SECRET_KEY'],
bucket=str(meta['org_id']))
extension = os.path.splitext(self.file)[-1]
ofilename = self.filehash + extension
outpath = 'sheartest'
metadata_s3 = {
'project_id': str(meta['project_id']),
'user_id': str(meta['user_id']),
'filename': os.path.split(self.file)[-1],
'speciment': meta['speciment_name']
}
mclient.compress_and_upload_file(self.file,
ofilename,
outpath=outpath,
content_type="application/raw",
metadata=metadata_s3)
def _set_parameter(self):
self.split_data_based_on_parameter = [
'T', 'sigma_normal', 'f', 'extension'
]
self.col_as_int = ['N']
self.col_as_float = ['T', 'F', 'f', 's_vert_sum']
self.val_col_names = ['time', 'T', 'f', 'N', 'F', 's_vert_sum']
# Header names after standardization; check if exists
self.val_header_names = ['speciment_diameter']
self.columns_analyse = [
'F', 's_vert_sum', 's_vert_1', 's_vert_2', 's_piston'
]
self.number_of_load_cycles_for_analysis = 5
def _calc_missiong_values(self):
cols = self.data.columns
for c in ['vert']:
if not f's_{c}_sum' in cols:
self.data[f's_{c}_sum'] = self.data[[f's_{c}_1', f's_{c}_2'
]].sum(axis=1).div(2.0)
def _fit_data(self):
self.fit = []
for idx_data, data in enumerate(self.data):
if data is None: continue
data.index = data.index - data.index[0]
res = {}
res['idx_data'] = int(idx_data)
# Fitting
freq = float(np.round(data['f'].mean(), 4))
if (self.debug):
sigma_normal = np.round(data['sigma_normal'].mean(), 3)
T = np.round(data['T'].mean(), 3)
for idxcol, col in enumerate(self.columns_analyse):
if not col in data.columns: continue
x = data.index.values
y = data[col].values
# Fourier Transformation
"""
dt = np.diff(x).mean() #mean sampling rate
n = len(x)
res[f'psd_{col}'] = sfft.rfft(y) #compute the FFT
res[f'freq_{col}'] = sfft.rfftfreq(n, dt)
"""
res_fit = fit_cos(x, y, freq=freq, constfreq=True)
res[f'r2_{col}'] = res_fit['r2']
res[f'fit_amp_{col}'] = res_fit['amp']
res[f'fit_freq_{col}'] = res_fit['freq']
res[f'fit_phase_{col}'] = res_fit['phase']
res[f'fit_offset_{col}'] = res_fit['offset']
res[f'fit_slope_{col}'] = res_fit['slope']
## Schersteifigkeit berechnen
deltaF = res['fit_amp_F']
deltaS = res['fit_amp_s_vert_sum']
A = np.pi * self.meta['speciment_diameter']**2 / 4
tau = deltaF / A
gamma = deltaS / self.gap_width
res['G'] = tau / gamma
#metadaten
for c in ['T', 'extension', 'sigma_normal', 'f']:
res[c] = data[c][0]
self.fit.append(res)
if (self.debug) & (len(self.fit) > 5):
break
self.fit = pd.DataFrame.from_records(self.fit)
def plot_results(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
fig, ax = plt.subplots()
xticks = list(dfplot['extension'].unique())
df = dfplot
df = df[(df['r2_F'] >= r2min) & (df['r2_s_vert_sum'] >= r2min)]
sns.scatterplot(
data=df,
x='fit_amp_s_vert_sum',
y='G',
hue='T',
ax=ax,
alpha=0.7,
#size=150,
size="G",
sizes=(50, 160),
edgecolor='k',
palette='muted',
zorder=10)
df = dfplot
df = df[(df['r2_F'] < r2min) & (df['r2_s_vert_sum'] < r2min)]
if not df.empty:
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='G',
facecolor='grey',
alpha=0.5,
legend=False,
zorder=1,
ax=ax)
ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$')
ax.set_ylabel(r'Scherseteifigkeit in MPa/mm')
ax.set_xticks(xticks)
ax.grid()
if not showplot:
ofile = os.path.join(opath, 'shearstiffness.pdf')
plt.savefig(ofile)
plt.show()
def plot_stats(self, opath=None, pkname=None, r2min=0.96):
if not (opath is None) & (pkname is None):
showplot = False
opath = os.path.join(opath, pkname)
if not os.path.exists(opath):
os.makedirs(opath)
else:
showplot = True
dfplot = self.fit.copy()
for col in ['extension', 'fit_amp_s_vert_sum']:
dfplot[col] = dfplot[col].mul(1000)
#r2
df = self.fit
fig, axs = plt.subplots(1, 2, sharey=True, sharex=True)
parscatter = dict(palette='muted', alpha=0.7, edgecolor='k', lw=0.3)
# r2
ax = axs[0]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_F',
hue='T',
ax=ax,
**parscatter)
ax.set_ylabel('Bestimmtheitsmaß $R^2$')
ax.set_title('Kraft')
ax = axs[1]
sns.scatterplot(data=df,
x='fit_amp_s_vert_sum',
y='r2_s_vert_sum',
hue='T',
legend=False,
ax=ax,
**parscatter)
ax.set_ylabel('$R^2$ (S_{mittel})')
ax.set_title('mittlerer Scherweg')
for ax in axs.flatten():
ax.grid()
ax.set_xlabel(r'gemessene Scherwegamplitude in $\mu m$')
plt.tight_layout()
if not showplot:
ofile = os.path.join(opath, 'stats_r2.pdf')
plt.savefig(ofile)
plt.show()
class ShearTestExtensionLaborHart(ShearTestExtension):
def _define_units(self):
self.unit_F = 1 / 1000.0 #N
self.unit_t = 1 / 1000. #s
def _set_units(self):
#for col in ['F']:
# self.data[col] = self.data[col].mul(self.unit_F)
for col in ['time']:
self.data[col] = self.data[col].mul(self.unit_t)
return True
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
# metadata from file
meta = {}
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
#remove whitespace
linesplit = line.strip()
linesplit = linesplit.split(splitsign)
if len(linesplit) == 2:
meta[linesplit[0]] = linesplit[1]
if count >= skiprows:
break
# data
data = pd.read_csv(self.file,
encoding=encoding,
skiprows=skiprows,
decimal=',',
sep=';')
## add header to df
with open(self.file, 'r', encoding=encoding) as f:
count = 0
for line in f:
count += 1
if count >= skiprows:
break
head = line.split(';')
data.columns = head
#clean data
data = data.dropna(axis=1)
#define in class
self.meta = meta
self.data = data
return True
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if any(map(key.__contains__, ['Probenbezeichnung'])):
self.meta['speciment'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Datum/Uhrzeit'])):
self.meta['datetime'] = self.meta.pop(key)
try:
self.meta['datetime'] = pd.to_datetime(
self.meta['datetime'])
except:
pass
elif any(map(key.__contains__, ['Probenhöhe'])):
self.meta['speciment_height'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Probendurchmesser'])):
self.meta['speciment_diameter'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Solltemperatur'])):
self.meta['temperature'] = float(
self.meta.pop(key).replace(',', '.'))
elif any(map(key.__contains__, ['Prüfbedingungen'])):
self.meta['test_version'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Name des VersAblf'])):
self.meta['test'] = self.meta.pop(key)
elif any(map(key.__contains__, ['Prüfer'])):
self.meta['examiner'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
if col == 'TIME':
colnames[i] = 'time'
#set values
elif col == 'Sollwert Frequenz':
colnames[i] = 'f'
elif col == 'SollTemperatur':
colnames[i] = 'T'
elif col == 'Max Scherweg':
colnames[i] = 'extension'
elif col == 'Sollwert Normalspannung':
colnames[i] = 'sigma_normal'
elif col == 'Impulsnummer':
colnames[i] = 'N'
# measurements
elif col == 'Load':
colnames[i] = 'F'
elif col == 'Position':
colnames[i] = 's_piston'
elif col == 'VERTIKAL Links':
colnames[i] = 's_vert_1'
elif col == 'VERTIKAL Rechts':
colnames[i] = 's_vert_2'
elif col == 'HORIZONTAL links':
colnames[i] = 's_hor_1'
elif col == 'HOIZONTAL Rechts':
colnames[i] = 's_hor_2'
self.data.columns = colnames
class ShearTestExtensionTUDresdenGeosys(ShearTestExtension):
def _define_units(self):
self.unit_S = 1 / 1000.0 #N
def _set_units(self):
for col in [
's_vert_sum', 's_vert_1', 's_vert_2', 's_piston', 'extension'
]:
self.data[col] = self.data[col].mul(self.unit_S)
#convert internal units to global
f = np.mean([0.9 / 355, 0.6 / 234.0, 0.3 / 116.0])
self.data['sigma_normal'] = self.data['sigma_normal'].mul(f).apply(
lambda x: np.round(x, 1))
return True
def _read_data(self):
"""
read data from Labor Hart
"""
# parameter
encoding = 'latin-1'
skiprows = 14
hasunits = True
splitsign = ':;'
head, data = read_geosys(self.file, '015')
#define in class
self.meta = head
self.data = data
return True
def _standardize_meta(self):
keys = list(self.meta.keys())
for key in keys:
if key == 'd':
self.meta['speciment_diameter'] = self.meta.pop(key)
return True
def _standardize_data(self):
colnames = list(self.data.columns)
for i, col in enumerate(colnames):
#set values
if col == 'soll temperature':
colnames[i] = 'T'
elif col == 'soll extension':
colnames[i] = 'extension'
elif col == 'soll sigma':
colnames[i] = 'sigma_normal'
elif col == 'soll frequency':
colnames[i] = 'f'
elif col == 'Number of vertical cycles':
colnames[i] = 'N'
# measurements
elif col == 'vertical load from hydraulic pressure':
colnames[i] = 'F'
elif col == 'vertical position from hydraulic pressure':
colnames[i] = 's_piston'
elif col == 'Vertical position from LVDT 1':
colnames[i] = 's_vert_1'
elif col == 'Vertical position from LVDT 2':
colnames[i] = 's_vert_2'
self.data.columns = colnames