import logging from datetime import datetime from utils.sql_utils import SQLUtils class ElectricityDAO: """电力消耗数据访问对象""" def __init__(self): """初始化数据访问对象""" self.db = SQLUtils('sqlite', database='db/jtDB.db') def __del__(self): """析构函数,确保数据库连接关闭""" if hasattr(self, 'db'): self.db.close() def create_table_if_not_exists(self): """创建电力消耗表(如果不存在)""" try: sql = """ CREATE TABLE IF NOT EXISTS wsbz_electricity_consumption ( id INTEGER PRIMARY KEY AUTOINCREMENT, sync_time TIMESTAMP, electricity_number REAL ) """ self.db.execute_query(sql) logging.info("电力消耗表检查/创建成功") return True except Exception as e: logging.error(f"创建电力消耗表失败: {str(e)}") return False def save_electricity_data(self, electricity_number): """保存电力消耗数据 Args: electricity_number: 电力消耗数值 Returns: bool: 保存是否成功 """ try: current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') sql = """ INSERT INTO wsbz_electricity_consumption ( sync_time, electricity_number ) VALUES (?, ?) """ params = (current_time, electricity_number) self.db.execute_update(sql, params) logging.info(f"保存电力消耗数据成功: {electricity_number}") return True except Exception as e: logging.error(f"保存电力消耗数据失败: {str(e)}") return False def get_latest_electricity_data(self): """获取最新的电力消耗数据 Returns: dict: 最新的电力消耗数据,未找到则返回None """ try: sql = """ SELECT id, sync_time, electricity_number FROM wsbz_electricity_consumption ORDER BY id DESC LIMIT 1 """ self.db.cursor.execute(sql) row = self.db.cursor.fetchone() if row: data = { 'id': row[0], 'sync_time': row[1], 'electricity_number': row[2] } return data else: return None except Exception as e: logging.error(f"获取最新电力消耗数据失败: {str(e)}") return None def get_electricity_data_by_date_range(self, start_date, end_date): """根据日期范围获取电力消耗数据 Args: start_date: 开始日期(YYYY-MM-DD格式) end_date: 结束日期(YYYY-MM-DD格式) Returns: list: 电力消耗数据列表 """ try: sql = """ SELECT id, sync_time, electricity_number FROM wsbz_electricity_consumption WHERE sync_time BETWEEN ? AND ? ORDER BY sync_time """ params = (f"{start_date} 00:00:00", f"{end_date} 23:59:59") self.db.cursor.execute(sql, params) results = self.db.cursor.fetchall() data_list = [] for row in results: data = { 'id': row[0], 'sync_time': row[1], 'electricity_number': row[2] } data_list.append(data) return data_list except Exception as e: logging.error(f"获取电力消耗数据失败: {str(e)}") return []