from pymodbus.server import StartTcpServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext import logging import threading import time # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ModbusServer: def __init__(self, host="localhost", port=5020): self.host = host self.port = port # 创建数据存储 self.store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [0] * 8000), # 离散输入 co=ModbusSequentialDataBlock(0, [0] * 8000), # 线圈 hr=ModbusSequentialDataBlock(0, [0] * 8000), # 保持寄存器 ir=ModbusSequentialDataBlock(0, [0] * 8000), # 输入寄存器 ) self.context = ModbusServerContext(slaves=self.store, single=True) # 初始化一些默认值 self.store.setValues(3, 20, [0]) # D20 初始为手动模式 self.store.setValues(3, 21, [0]) # D21 初始为未启动 self.store.setValues(3, 22, [0]) # D22 初始为未锁定 def start(self): """启动服务器""" logger.info(f"启动 Modbus TCP 服务器于 {self.host}:{self.port}") self.server_thread = threading.Thread( target=StartTcpServer, args=(self.context,), kwargs={'address': (self.host, self.port)} ) self.server_thread.daemon = True self.server_thread.start() def set_auto_mode(self): """设置为自动模式""" self.store.setValues(3, 20, [2]) logger.info("已设置为自动模式") def set_manual_mode(self): """设置为手动模式""" self.store.setValues(3, 20, [0]) logger.info("已设置为手动模式") def get_register_value(self, address): """获取寄存器值""" values = self.store.getValues(3, address, 1) return values[0] def print_status(self): """打印当前状态""" mode = "自动" if self.get_register_value(20) == 2 else "手动" running = "是" if self.get_register_value(22) == 1 else "否" logger.info(f"当前模式: {mode}") logger.info(f"是否运行中: {running}") if __name__ == "__main__": # 创建并启动服务器 server = ModbusServer() server.start() # 设置为自动模式 server.set_auto_mode() try: while True: server.print_status() time.sleep(5) # 每5秒打印一次状态 except KeyboardInterrupt: logger.info("服务器停止运行")