import logging import time from threading import Thread, Event from PySide6.QtCore import QTimer from utils.modbus_utils import ModbusUtils from utils.modbus_monitor import RegisterHandler from dao.electricity_dao import ElectricityDAO from utils.config_loader import ConfigLoader class ElectricityHandler(RegisterHandler): """电力消耗寄存器处理器""" def __init__(self): """初始化处理器""" self.dao = ElectricityDAO() # 确保表已创建 self.dao.create_table_if_not_exists() def handle_change(self, value): """处理寄存器值变化 Args: value: 寄存器值 """ try: # 保存电力消耗数据 self.dao.save_electricity_data(value) logging.info(f"已记录电力消耗数据: {value}") except Exception as e: logging.error(f"处理电力消耗数据时发生错误: {str(e)}") class ElectricityMonitor: """电力消耗监控器""" _instance = None @classmethod def get_instance(cls): """获取单例实例""" if cls._instance is None: cls._instance = ElectricityMonitor() return cls._instance def __init__(self): """初始化电力监控器""" if ElectricityMonitor._instance: raise Exception("ElectricityMonitor is a singleton class.") self.modbus = ModbusUtils() self.dao = ElectricityDAO() self.config = ConfigLoader.get_instance() # 确保表已创建 self.dao.create_table_if_not_exists() # 创建定时器 self.timer = None self.client = None self.stop_event = Event() self.monitor_thread = None # 电力寄存器地址 self.electricity_register = 30 # 从配置中读取监听间隔(分钟) self.interval_minutes = self.config.get_value('electricity.interval_minutes', 1) # 从配置中读取是否自动启动 self.auto_start = self.config.get_value('electricity.auto_start', True) # 上次读取时间 self.last_read_time = None # 如果配置为自动启动,则启动监控 if self.auto_start: self.start() def start(self): """启动监控""" if self.timer and self.timer.isActive(): logging.warning("电力监控器已经在运行中") return # 创建并启动定时器,每分钟读取一次 self.timer = QTimer() self.timer.timeout.connect(self._read_electricity_data) self.timer.start(self.interval_minutes * 60000) # 转换为毫秒 # 立即执行一次读取 self._read_electricity_data() # 更新配置 self.config.set_value('electricity.auto_start', True) logging.info(f"电力监控器已启动,每{self.interval_minutes}分钟读取一次寄存器30的数据") def stop(self): """停止监控""" if self.timer: self.timer.stop() if self.client: self.modbus.close_client(self.client) self.client = None # 更新配置 self.config.set_value('electricity.auto_start', False) logging.info("电力监控器已停止") def set_interval(self, minutes): """设置监听间隔 Args: minutes: 间隔分钟数 """ if minutes < 1: minutes = 1 self.interval_minutes = minutes # 更新配置 self.config.set_value('electricity.interval_minutes', minutes) # 如果定时器正在运行,则更新间隔 if self.timer and self.timer.isActive(): self.timer.setInterval(minutes * 60000) # 转换为毫秒 logging.info(f"电力监控间隔已更新为{minutes}分钟") def is_monitoring(self): """检查是否正在监控 Returns: bool: 是否正在监控 """ return self.timer is not None and self.timer.isActive() def get_last_read_time(self): """获取上次读取时间 Returns: datetime: 上次读取时间,如果未读取过则返回None """ return self.last_read_time def get_next_read_time(self): """获取下次读取时间 Returns: datetime: 下次读取时间,如果未在监控则返回None """ if not self.is_monitoring() or not self.last_read_time: return None from datetime import datetime, timedelta return self.last_read_time + timedelta(minutes=self.interval_minutes) def _read_electricity_data(self): """读取电力消耗数据""" try: # 如果客户端未连接,则创建连接 if not self.client: self.client = self.modbus.get_client() if not self.client: logging.error("无法连接到Modbus服务器,电力数据读取失败") return # 读取寄存器30的值 result = self.modbus.read_holding_register(self.client, self.electricity_register) if result is None or len(result) == 0: logging.warning(f"读取寄存器D{self.electricity_register}失败") return # 获取电力消耗值 electricity_value = result[0] # 保存到数据库 success = self.dao.save_electricity_data(electricity_value) if success: logging.info(f"已记录电力消耗数据: {electricity_value}") # 更新上次读取时间 from datetime import datetime self.last_read_time = datetime.now() else: logging.error("保存电力消耗数据失败") except Exception as e: logging.error(f"读取或保存电力消耗数据时发生错误: {str(e)}") # 关闭连接,下次重新尝试 if self.client: self.modbus.close_client(self.client) self.client = None