jiateng_ws/modbus_server.py

76 lines
2.6 KiB
Python

from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
import logging
import threading
import time
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ModbusServer:
def __init__(self, host="localhost", port=5020):
self.host = host
self.port = port
# 创建数据存储
self.store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0] * 8000), # 离散输入
co=ModbusSequentialDataBlock(0, [0] * 8000), # 线圈
hr=ModbusSequentialDataBlock(0, [0] * 8000), # 保持寄存器
ir=ModbusSequentialDataBlock(0, [0] * 8000), # 输入寄存器
)
self.context = ModbusServerContext(slaves=self.store, single=True)
# 初始化一些默认值
self.store.setValues(3, 20, [0]) # D20 初始为手动模式
self.store.setValues(3, 21, [0]) # D21 初始为未启动
self.store.setValues(3, 22, [0]) # D22 初始为未锁定
def start(self):
"""启动服务器"""
logger.info(f"启动 Modbus TCP 服务器于 {self.host}:{self.port}")
self.server_thread = threading.Thread(
target=StartTcpServer,
args=(self.context,),
kwargs={'address': (self.host, self.port)}
)
self.server_thread.daemon = True
self.server_thread.start()
def set_auto_mode(self):
"""设置为自动模式"""
self.store.setValues(3, 20, [2])
logger.info("已设置为自动模式")
def set_manual_mode(self):
"""设置为手动模式"""
self.store.setValues(3, 20, [0])
logger.info("已设置为手动模式")
def get_register_value(self, address):
"""获取寄存器值"""
values = self.store.getValues(3, address, 1)
return values[0]
def print_status(self):
"""打印当前状态"""
mode = "自动" if self.get_register_value(20) == 2 else "手动"
running = "" if self.get_register_value(22) == 1 else ""
logger.info(f"当前模式: {mode}")
logger.info(f"是否运行中: {running}")
if __name__ == "__main__":
# 创建并启动服务器
server = ModbusServer()
server.start()
# 设置为自动模式
server.set_auto_mode()
try:
while True:
server.print_status()
time.sleep(5) # 每5秒打印一次状态
except KeyboardInterrupt:
logger.info("服务器停止运行")