readFileSystem/src/xml_parser.py

129 lines
7.0 KiB
Python
Raw Normal View History

import xml.etree.ElementTree as ET
import os
class XmlParser:
def parse_file(self, file_path):
if not os.path.exists(file_path):
return None, f"文件未找到: {file_path}"
try:
tree = ET.parse(file_path)
root = tree.getroot()
# Metadata from root
xml_version = root.get('XMLVersion')
xml_creation_datetime = root.get('XMLCreationDateTime')
data_source = os.path.basename(file_path)
# SampleResult (assuming single SampleResult for now based on snippet)
sample_result = root.find('SampleResult')
if sample_result is None:
return None, "没有找到 SampleResult 标签"
# Common Sample Info
sample_info = {
"sample_name": sample_result.get('Name'),
"operator_name": sample_result.get('OperatorName'),
"instrument": sample_result.get('Instrument'),
"method_name": sample_result.get('MethodName'),
"sample_type": sample_result.get('Type'),
"xml_version": xml_version,
"xml_creation_datetime": xml_creation_datetime,
"data_source": data_source
}
# SampleIDs
sample_ids_node = sample_result.find('SampleIDs')
if sample_ids_node:
for sid in sample_ids_node.findall('SampleID'):
id_name = sid.find('IDName').text
id_value = sid.find('IDValue').text
if id_name == 'Grade ID':
sample_info['sample_grade_id'] = id_value
elif id_name == 'Grade Alias':
sample_info['sample_grade_alias'] = id_value
extracted_data = []
# MeasurementReplicates
replicates_node = sample_result.find('MeasurementReplicates')
if replicates_node:
for rep in replicates_node.findall('MeasurementReplicate'):
rep_info = sample_info.copy()
rep_info['replicate_no'] = rep.get('No')
rep_info['measure_datetime'] = rep.get('MeasureDateTime')
rep_info['is_deleted'] = rep.get('IsDeleted')
measurement = rep.find('Measurement')
if measurement:
rep_info['check_type'] = measurement.get('CheckType')
rep_info['check_status'] = measurement.get('CheckStatus')
rep_info['grade_name'] = measurement.get('GradeName')
rep_info['rsd_check'] = measurement.get('RsdCheck')
ext_data_meas = measurement.find('ExtData')
if ext_data_meas:
base_el = ext_data_meas.find('Base')
if base_el is not None:
rep_info['base_element'] = base_el.text
duration_el = ext_data_meas.find('MeasDuration')
if duration_el is not None:
rep_info['measure_duration'] = duration_el.text
# Elements
elements_node = measurement.find('Elements')
if elements_node:
for el in elements_node.findall('Element'):
el_data = rep_info.copy()
el_data['element_name'] = el.get('ElementName')
el_data['element_type'] = el.get('Type')
el_data['concentration_unit'] = el.get('DefaultConcentrationUnit')
# Concentration Result
conc_res = el.find("./ElementResult[@Kind='Concentration'][@StatType='None']")
if conc_res:
el_data['concentration_value'] = conc_res.find('ResultValue').text
el_data['result_status'] = conc_res.get('Status')
el_data['calibration_status'] = conc_res.get('CalibrationStatus')
el_data['acceptance_status'] = conc_res.get('AcceptanceStatus')
# Limits
limits = conc_res.find('ResultValueLimits')
if limits:
lower = limits.find("./ResultValueLimit[@Type='LowerAcceptanceLimit']")
upper = limits.find("./ResultValueLimit[@Type='UpperAcceptanceLimit']")
el_data['lower_acceptance_limit'] = lower.text if lower is not None else None
el_data['upper_acceptance_limit'] = upper.text if upper is not None else None
# Uncertainty
ext_res = conc_res.find('ExtData')
if ext_res:
unc_abs = ext_res.find("./ResultValue[@SubType='UncertaintyAbs']")
unc_rel = ext_res.find("./ResultValue[@SubType='UncertaintyRel']")
el_data['uncertainty_abs'] = unc_abs.text if unc_abs is not None else None
el_data['uncertainty_rel'] = unc_rel.text if unc_rel is not None else None
std_res = el.find("./ElementResult[@StatType='StdDev']")
if std_res:
el_data['std_dev_value'] = std_res.find('ResultValue').text
# 如果为空就用None填充
required_keys = [
'sample_grade_id', 'sample_grade_alias', 'base_element',
'measure_duration', 'check_type', 'check_status', 'grade_name', 'rsd_check',
'concentration_value', 'std_dev_value', 'uncertainty_abs',
'uncertainty_rel', 'lower_acceptance_limit', 'upper_acceptance_limit',
'result_status', 'calibration_status', 'acceptance_status'
]
for key in required_keys:
if key not in el_data:
el_data[key] = None
extracted_data.append(el_data)
return extracted_data, "Success"
except Exception as e:
import traceback
traceback.print_exc()
return None, f"Parse error: {str(e)}"