jiateng_ws/dao/electricity_dao.py

124 lines
3.9 KiB
Python
Raw Normal View History

2025-06-26 18:26:22 +08:00
import logging
from datetime import datetime
from utils.sql_utils import SQLUtils
class ElectricityDAO:
"""电力消耗数据访问对象"""
def __init__(self):
"""初始化数据访问对象"""
self.db = SQLUtils('sqlite', database='db/jtDB.db')
def __del__(self):
"""析构函数,确保数据库连接关闭"""
if hasattr(self, 'db'):
self.db.close()
def create_table_if_not_exists(self):
"""创建电力消耗表(如果不存在)"""
try:
sql = """
CREATE TABLE IF NOT EXISTS wsbz_electricity_consumption (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sync_time TIMESTAMP,
electricity_number REAL
)
"""
self.db.execute_query(sql)
logging.info("电力消耗表检查/创建成功")
return True
except Exception as e:
logging.error(f"创建电力消耗表失败: {str(e)}")
return False
def save_electricity_data(self, electricity_number):
"""保存电力消耗数据
Args:
electricity_number: 电力消耗数值
Returns:
bool: 保存是否成功
"""
try:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = """
INSERT INTO wsbz_electricity_consumption (
sync_time, electricity_number
) VALUES (?, ?)
"""
params = (current_time, electricity_number)
self.db.execute_update(sql, params)
logging.info(f"保存电力消耗数据成功: {electricity_number}")
return True
except Exception as e:
logging.error(f"保存电力消耗数据失败: {str(e)}")
return False
def get_latest_electricity_data(self):
"""获取最新的电力消耗数据
Returns:
dict: 最新的电力消耗数据未找到则返回None
"""
try:
sql = """
SELECT id, sync_time, electricity_number
FROM wsbz_electricity_consumption
ORDER BY id DESC
LIMIT 1
"""
self.db.cursor.execute(sql)
row = self.db.cursor.fetchone()
if row:
data = {
'id': row[0],
'sync_time': row[1],
'electricity_number': row[2]
}
return data
else:
return None
except Exception as e:
logging.error(f"获取最新电力消耗数据失败: {str(e)}")
return None
def get_electricity_data_by_date_range(self, start_date, end_date):
"""根据日期范围获取电力消耗数据
Args:
start_date: 开始日期YYYY-MM-DD格式
end_date: 结束日期YYYY-MM-DD格式
Returns:
list: 电力消耗数据列表
"""
try:
sql = """
SELECT id, sync_time, electricity_number
FROM wsbz_electricity_consumption
WHERE sync_time BETWEEN ? AND ?
ORDER BY sync_time
"""
params = (f"{start_date} 00:00:00", f"{end_date} 23:59:59")
self.db.cursor.execute(sql, params)
results = self.db.cursor.fetchall()
data_list = []
for row in results:
data = {
'id': row[0],
'sync_time': row[1],
'electricity_number': row[2]
}
data_list.append(data)
return data_list
except Exception as e:
logging.error(f"获取电力消耗数据失败: {str(e)}")
return []