jiateng_ws/utils/electricity_monitor.py

200 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import time
from threading import Thread, Event
from PySide6.QtCore import QTimer, QObject, Signal
from utils.modbus_utils import ModbusUtils
from utils.modbus_monitor import RegisterHandler
from dao.electricity_dao import ElectricityDAO
from utils.config_loader import ConfigLoader
class ElectricityHandler(RegisterHandler, QObject):
"""电力消耗寄存器处理器"""
# 定义电力数据变化信号
electricity_data_changed = Signal(float)
def __init__(self):
"""初始化处理器"""
QObject.__init__(self)
self.dao = ElectricityDAO()
# 确保表已创建
self.dao.create_table_if_not_exists()
def handle_change(self, value):
"""处理寄存器值变化
Args:
value: 寄存器值
"""
try:
# 保存电力消耗数据
self.dao.save_electricity_data(value)
logging.info(f"已记录电力消耗数据: {value}")
# 发射电力数据变化信号
self.electricity_data_changed.emit(value)
except Exception as e:
logging.error(f"处理电力消耗数据时发生错误: {str(e)}")
class ElectricityMonitor:
"""电力消耗监控器"""
_instance = None
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
cls._instance = ElectricityMonitor()
return cls._instance
def __init__(self):
"""初始化电力监控器"""
if ElectricityMonitor._instance:
raise Exception("ElectricityMonitor is a singleton class.")
self.modbus = ModbusUtils()
self.dao = ElectricityDAO()
self.config = ConfigLoader.get_instance()
# 确保表已创建
self.dao.create_table_if_not_exists()
# 创建定时器
self.timer = None
self.client = None
self.stop_event = Event()
self.monitor_thread = None
# 电力寄存器地址
self.electricity_register = 30
# 从配置中读取监听间隔(分钟)
self.interval_minutes = self.config.get_value('electricity.interval_minutes', 1)
# 从配置中读取是否自动启动
self.auto_start = self.config.get_value('electricity.auto_start', True)
# 上次读取时间
self.last_read_time = None
# 如果配置为自动启动,则启动监控
if self.auto_start:
self.start()
def start(self):
"""启动监控"""
if self.timer and self.timer.isActive():
logging.warning("电力监控器已经在运行中")
return
# 创建并启动定时器,每分钟读取一次
self.timer = QTimer()
self.timer.timeout.connect(self._read_electricity_data)
self.timer.start(self.interval_minutes * 60000) # 转换为毫秒
# 立即执行一次读取
self._read_electricity_data()
# 更新配置
self.config.set_value('electricity.auto_start', True)
logging.info(f"电力监控器已启动,每{self.interval_minutes}分钟读取一次寄存器30的数据")
def stop(self):
"""停止监控"""
if self.timer:
self.timer.stop()
if self.client:
self.modbus.close_client(self.client)
self.client = None
# 更新配置
self.config.set_value('electricity.auto_start', False)
logging.info("电力监控器已停止")
def set_interval(self, minutes):
"""设置监听间隔
Args:
minutes: 间隔分钟数
"""
if minutes < 1:
minutes = 1
self.interval_minutes = minutes
# 更新配置
self.config.set_value('electricity.interval_minutes', minutes)
# 如果定时器正在运行,则更新间隔
if self.timer and self.timer.isActive():
self.timer.setInterval(minutes * 60000) # 转换为毫秒
logging.info(f"电力监控间隔已更新为{minutes}分钟")
def is_monitoring(self):
"""检查是否正在监控
Returns:
bool: 是否正在监控
"""
return self.timer is not None and self.timer.isActive()
def get_last_read_time(self):
"""获取上次读取时间
Returns:
datetime: 上次读取时间如果未读取过则返回None
"""
return self.last_read_time
def get_next_read_time(self):
"""获取下次读取时间
Returns:
datetime: 下次读取时间如果未在监控则返回None
"""
if not self.is_monitoring() or not self.last_read_time:
return None
from datetime import datetime, timedelta
return self.last_read_time + timedelta(minutes=self.interval_minutes)
def _read_electricity_data(self):
"""读取电力消耗数据"""
try:
# 如果客户端未连接,则创建连接
if not self.client:
self.client = self.modbus.get_client()
if not self.client:
logging.error("无法连接到Modbus服务器电力数据读取失败")
return
# 读取寄存器30的值
result = self.modbus.read_holding_register(self.client, self.electricity_register)
if result is None or len(result) == 0:
logging.warning(f"读取寄存器D{self.electricity_register}失败")
return
# 获取电力消耗值
electricity_value = result[0]
# 保存到数据库
success = self.dao.save_electricity_data(electricity_value)
if success:
logging.info(f"已记录电力消耗数据: {electricity_value}")
# 更新上次读取时间
from datetime import datetime
self.last_read_time = datetime.now()
else:
logging.error("保存电力消耗数据失败")
except Exception as e:
logging.error(f"读取或保存电力消耗数据时发生错误: {str(e)}")
# 关闭连接,下次重新尝试
if self.client:
self.modbus.close_client(self.client)
self.client = None