import logging from datetime import datetime from utils.sql_utils import SQLUtils class ElectricityDAO: """电力消耗数据访问对象""" def __init__(self): """初始化数据访问对象""" self.db = SQLUtils('sqlite', database='db/jtDB.db') def __del__(self): """析构函数,确保数据库连接关闭""" if hasattr(self, 'db'): self.db.close() def create_table_if_not_exists(self): """创建电力消耗表(如果不存在)""" try: sql = """ CREATE TABLE IF NOT EXISTS wsbz_electricity_consumption ( id INTEGER PRIMARY KEY AUTOINCREMENT, sync_time TIMESTAMP, electricity_number REAL ) """ self.db.execute_query(sql) logging.info("电力消耗表检查/创建成功") return True except Exception as e: logging.error(f"创建电力消耗表失败: {str(e)}") return False def save_electricity_data(self, electricity_number): """保存电力消耗数据 Args: electricity_number: 电力消耗数值 Returns: bool: 保存是否成功 """ try: current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') sql = """ INSERT INTO wsbz_electricity_consumption ( sync_time, electricity_number ) VALUES (?, ?) """ params = (current_time, electricity_number) self.db.execute_update(sql, params) logging.info(f"保存电力消耗数据成功: {electricity_number}") return True except Exception as e: logging.error(f"保存电力消耗数据失败: {str(e)}") return False def get_latest_electricity_data(self): """获取最新的电力消耗数据 Returns: dict: 最新的电力消耗数据,未找到则返回None """ try: sql = """ SELECT id, sync_time, electricity_number FROM wsbz_electricity_consumption ORDER BY id DESC LIMIT 1 """ self.db.cursor.execute(sql) row = self.db.cursor.fetchone() if row: data = { 'id': row[0], 'sync_time': row[1], 'electricity_number': row[2] } return data else: return None except Exception as e: logging.error(f"获取最新电力消耗数据失败: {str(e)}") return None def get_electricity_data_by_date_range(self, start_date, end_date): """根据日期范围获取电力消耗数据 Args: start_date: 开始日期(YYYY-MM-DD格式) end_date: 结束日期(YYYY-MM-DD格式) Returns: list: 电力消耗数据列表 """ try: sql = """ SELECT id, sync_time, electricity_number FROM wsbz_electricity_consumption WHERE sync_time BETWEEN ? AND ? ORDER BY sync_time """ params = (f"{start_date} 00:00:00", f"{end_date} 23:59:59") self.db.cursor.execute(sql, params) results = self.db.cursor.fetchall() data_list = [] for row in results: data = { 'id': row[0], 'sync_time': row[1], 'electricity_number': row[2] } data_list.append(data) return data_list except Exception as e: logging.error(f"获取电力消耗数据失败: {str(e)}") return [] def get_electricity_statistics(self): """获取电力消耗统计数据(日/月/年/累计) Returns: dict: 包含日、月、年、累计用电量的字典 """ try: # 使用提供的SQL查询 sql = """ SELECT CASE WHEN sync_time >= DATE('now') AND sync_time < DATE('now', '+1 day') THEN MAX(electricity_number) - MIN(electricity_number) ELSE 0 END AS electricity_number_day, CASE WHEN sync_time >= DATE('now', 'start of month') AND sync_time < DATE('now', 'start of month', '+1 month') THEN MAX(electricity_number) - MIN(electricity_number) ELSE 0 END AS electricity_number_month, CASE WHEN sync_time >= DATE('now', 'start of year') AND sync_time < DATE('now', 'start of year', '+1 year') THEN MAX(electricity_number) - MIN(electricity_number) ELSE 0 END AS electricity_number_year, MAX(electricity_number) - MIN(electricity_number) AS electricity_number_all FROM wsbz_electricity_consumption """ self.db.cursor.execute(sql) row = self.db.cursor.fetchone() if row: data = { 'day': row[0] if row[0] is not None else 0, 'month': row[1] if row[1] is not None else 0, 'year': row[2] if row[2] is not None else 0, 'all': row[3] if row[3] is not None else 0 } return data else: return { 'day': 0, 'month': 0, 'year': 0, 'all': 0 } except Exception as e: logging.error(f"获取电力消耗统计数据失败: {str(e)}") return { 'day': 0, 'month': 0, 'year': 0, 'all': 0 }