Files
lib-paveit-demo/src/paveit/io/geosys.py
2023-06-05 21:44:45 +02:00

250 lines
6.7 KiB
Python
Executable File

import csv
import os
from io import BytesIO
from sys import getsizeof
from numpy import array
from pandas import DataFrame
def detect_tabnum(filename, tabstr, encoding='utf-8'):
filename = os.path.normpath(filename)
tabstr = tabstr.lower()
#Einlesen
with open(filename, 'r', encoding=encoding) as inFile:
reader = csv.reader(inFile, delimiter='\t')
counter = 0
for row in reader:
row = [r.lower() for r in row]
if any(tabstr in mystring for mystring in row):
if 'plain' in row:
return row[1]
counter += 1
if counter > 100:
return False
def str2float(str):
try:
str = str.replace(',', '.')
return float(str)
except:
return None
def read_geosys(buffer: BytesIO,
table,
pkdata='001',
metadata_ids=['003', '015'],
encoding='utf-8',
to_si=False,
debug=False):
'''
:param buffer: Bytes IO Object
:param table: Table-Number
:param pkdata: Table-Number of speciment definitions, default: 1
:param encoding: Encoding, default: utf-8
:param debug: debug-mode
:return:
'''
try:
dictOut = {}
dictOut['durch'] = 0
dictOut['hoehe'] = 0
#---------------------------------------------------------------------
#Daten einlesen und umwandeln
#---------------------------------------------------------------------
#Einlesen
buffer.seek(0)
lines = buffer.readlines()
data = []
for line in lines:
try:
line = line.decode(encoding)
line = line.split('\t')
if len(line) > 2:
v = line[0][0:3]
if len(v) == 3:
if (table == v) or (pkdata == v) or (v in metadata_ids):
data.append(line)
except:
pass
if debug:
print('Anz. Datensätze: ', str(len(data)), getsizeof(data))
#aufräumen
##Datenstruktur anlegen
data_processed = {}
data_processed['head'] = []
data_processed['metadata'] = {}
data_processed['data'] = []
for i in metadata_ids:
data_processed['metadata'][i] = []
for idx, d in enumerate(data):
try:
v = d[0][0:3]
if v in pkdata: data_processed['head'].append(d)
if v in metadata_ids: data_processed['metadata'][v].append(d)
if v in table: data_processed['data'].append(d)
except:
pass
# replace object
data = data_processed
assert len(data['data']) != 0
if debug:
print('data_clean fin')
## Header aufbereiten
for idx, row in enumerate(data['head']):
if idx == 0:
id_durchmesser = None
id_hoehe = None
id_name = None
for idx_name, name in enumerate(row):
name_lower = name.lower()
if any(map(name_lower.__contains__, ['durchmesser'])):
id_durchmesser = idx_name
elif any(map(name_lower.__contains__, ['bezeichnung'])):
id_name = idx_name
elif any(map(name_lower.__contains__, ['höhe'])):
id_hoehe = idx_name
if debug:
print(id_durchmesser, id_hoehe, id_name)
elif idx == 1:
unit_durch = None
unit_hoehe = None
try:
unit_durch = row[id_durchmesser]
unit_hoehe = row[id_hoehe]
except:
pass
elif idx == 2:
durchmesser = None
hoehe = None
name = None
try:
durchmesser = str2float(row[id_durchmesser])
hoehe = str2float(row[id_hoehe])
name = row[id_name]
except:
pass
header = {
'speciment_diameter': durchmesser,
'speciment_height': hoehe,
'name': name,
'unit_h': unit_hoehe,
'unit_d': unit_durch
}
meta = data['metadata']
for key in meta.keys():
sel = meta[key]
assert len(sel[0]) == len(sel[2])
if len(sel) <= 3:
d = { sel[0][i]: sel[2][i].strip() for i in range(len(sel[0])) }
# Fix: In Geosys gibt es den Parameter Oberspannung zweimal. Erster entfernen
else:
d = { sel[0][i]: sel[3][i].strip() for i in range(len(sel[0])) }
header_append = d
header.update(header_append)
#Fix Frequenz: Ich muss dies in den Eingangsdaten der TUD anpassen
try:
l = 'Versuchsart\r\n'
header['Frequenz'] = float(header[l].split('Hz')[0].split('Steifigkeit')[1].strip().replace(',','.'))
except:
pass
if debug:
print('header\n', header)
# add metadata to header
## Daten in Pandas DataFrame umwandeln
if debug:
print('daten umwandel')
temp = []
for idx, row in enumerate(data['data']):
if idx == 0:
if debug:
print('convert head')
data_head = []
for idx_name, name in enumerate(row):
if idx_name <= 1: continue
data_head.append(name)
elif idx == 1:
data_units = []
for idx_name, name in enumerate(row):
if idx_name <= 1: continue
data_units.append(name)
else:
t = []
for idx_col, value in enumerate(row):
if idx_col <= 1:
continue
else:
t.append(str2float(value))
temp.append(t)
data = array(temp)
if debug:
print(data_head, data_units)
## Bezeichnungen der Daten normalisieren
# Pandas DataFrame erstellen
data = DataFrame(data=data, columns=data_head)
if debug:
print(data.head())
return header, data
except:
print('Fehler beim lesen')
raise