jiateng_ws/dao/electricity_dao.py

176 lines
6.1 KiB
Python
Raw Normal View History

2025-06-26 18:26:22 +08:00
import logging
from datetime import datetime
from utils.sql_utils import SQLUtils
class ElectricityDAO:
"""电力消耗数据访问对象"""
def __init__(self):
"""初始化数据访问对象"""
self.db = SQLUtils('sqlite', database='db/jtDB.db')
def __del__(self):
"""析构函数,确保数据库连接关闭"""
if hasattr(self, 'db'):
self.db.close()
def create_table_if_not_exists(self):
"""创建电力消耗表(如果不存在)"""
try:
sql = """
CREATE TABLE IF NOT EXISTS wsbz_electricity_consumption (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sync_time TIMESTAMP,
electricity_number REAL
)
"""
self.db.execute_query(sql)
logging.info("电力消耗表检查/创建成功")
return True
except Exception as e:
logging.error(f"创建电力消耗表失败: {str(e)}")
return False
def save_electricity_data(self, electricity_number):
"""保存电力消耗数据
Args:
electricity_number: 电力消耗数值
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """
INSERT INTO wsbz_electricity_consumption (
sync_time, electricity_number
) VALUES (?, ?)
"""
params = (current_time, electricity_number)
self.db.execute_update(sql, params)
logging.info(f"保存电力消耗数据成功: {electricity_number}")
return True
except Exception as e:
logging.error(f"保存电力消耗数据失败: {str(e)}")
return False
def get_latest_electricity_data(self):
"""获取最新的电力消耗数据
Returns:
dict: 最新的电力消耗数据未找到则返回None
"""
try:
sql = """
SELECT id, sync_time, electricity_number
FROM wsbz_electricity_consumption
ORDER BY id DESC
LIMIT 1
"""
self.db.cursor.execute(sql)
row = self.db.cursor.fetchone()
if row:
data = {
'id': row[0],
'sync_time': row[1],
'electricity_number': row[2]
}
return data
else:
return None
except Exception as e:
logging.error(f"获取最新电力消耗数据失败: {str(e)}")
return None
def get_electricity_data_by_date_range(self, start_date, end_date):
"""根据日期范围获取电力消耗数据
Args:
start_date: 开始日期YYYY-MM-DD格式
end_date: 结束日期YYYY-MM-DD格式
Returns:
list: 电力消耗数据列表
"""
try:
sql = """
SELECT id, sync_time, electricity_number
FROM wsbz_electricity_consumption
WHERE sync_time BETWEEN ? AND ?
ORDER BY sync_time
"""
params = (f"{start_date} 00:00:00", f"{end_date} 23:59:59")
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'sync_time': row[1],
'electricity_number': row[2]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取电力消耗数据失败: {str(e)}")
return []
def get_electricity_statistics(self):
"""获取电力消耗统计数据(日/月/年/累计)
Returns:
dict: 包含日累计用电量的字典
"""
try:
# 使用提供的SQL查询
sql = """
SELECT CASE
WHEN sync_time >= DATE('now') AND sync_time < DATE('now', '+1 day')
THEN MAX(electricity_number) - MIN(electricity_number)
ELSE 0 END AS electricity_number_day,
CASE
WHEN sync_time >= DATE('now', 'start of month') AND sync_time < DATE('now', 'start of month', '+1 month')
THEN MAX(electricity_number) - MIN(electricity_number)
ELSE 0 END AS electricity_number_month,
CASE
WHEN sync_time >= DATE('now', 'start of year') AND sync_time < DATE('now', 'start of year', '+1 year')
THEN MAX(electricity_number) - MIN(electricity_number)
ELSE 0 END AS electricity_number_year,
MAX(electricity_number) - MIN(electricity_number) AS electricity_number_all
FROM wsbz_electricity_consumption
"""
self.db.cursor.execute(sql)
row = self.db.cursor.fetchone()
if row:
data = {
'day': row[0] if row[0] is not None else 0,
'month': row[1] if row[1] is not None else 0,
'year': row[2] if row[2] is not None else 0,
'all': row[3] if row[3] is not None else 0
}
return data
else:
return {
'day': 0,
'month': 0,
'year': 0,
'all': 0
}
except Exception as e:
logging.error(f"获取电力消耗统计数据失败: {str(e)}")
return {
'day': 0,
'month': 0,
'year': 0,
'all': 0
}