76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
from pymodbus.server import StartTcpServer
|
|
from pymodbus.datastore import ModbusSequentialDataBlock
|
|
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
|
|
import logging
|
|
import threading
|
|
import time
|
|
|
|
# 配置日志
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ModbusServer:
|
|
def __init__(self, host="localhost", port=5020):
|
|
self.host = host
|
|
self.port = port
|
|
# 创建数据存储
|
|
self.store = ModbusSlaveContext(
|
|
di=ModbusSequentialDataBlock(0, [0] * 8000), # 离散输入
|
|
co=ModbusSequentialDataBlock(0, [0] * 8000), # 线圈
|
|
hr=ModbusSequentialDataBlock(0, [0] * 8000), # 保持寄存器
|
|
ir=ModbusSequentialDataBlock(0, [0] * 8000), # 输入寄存器
|
|
)
|
|
self.context = ModbusServerContext(slaves=self.store, single=True)
|
|
|
|
# 初始化一些默认值
|
|
self.store.setValues(3, 20, [0]) # D20 初始为手动模式
|
|
self.store.setValues(3, 21, [0]) # D21 初始为未启动
|
|
self.store.setValues(3, 22, [0]) # D22 初始为未锁定
|
|
|
|
def start(self):
|
|
"""启动服务器"""
|
|
logger.info(f"启动 Modbus TCP 服务器于 {self.host}:{self.port}")
|
|
self.server_thread = threading.Thread(
|
|
target=StartTcpServer,
|
|
args=(self.context,),
|
|
kwargs={'address': (self.host, self.port)}
|
|
)
|
|
self.server_thread.daemon = True
|
|
self.server_thread.start()
|
|
|
|
def set_auto_mode(self):
|
|
"""设置为自动模式"""
|
|
self.store.setValues(3, 20, [2])
|
|
logger.info("已设置为自动模式")
|
|
|
|
def set_manual_mode(self):
|
|
"""设置为手动模式"""
|
|
self.store.setValues(3, 20, [0])
|
|
logger.info("已设置为手动模式")
|
|
|
|
def get_register_value(self, address):
|
|
"""获取寄存器值"""
|
|
values = self.store.getValues(3, address, 1)
|
|
return values[0]
|
|
|
|
def print_status(self):
|
|
"""打印当前状态"""
|
|
mode = "自动" if self.get_register_value(20) == 2 else "手动"
|
|
running = "是" if self.get_register_value(22) == 1 else "否"
|
|
logger.info(f"当前模式: {mode}")
|
|
logger.info(f"是否运行中: {running}")
|
|
|
|
if __name__ == "__main__":
|
|
# 创建并启动服务器
|
|
server = ModbusServer()
|
|
server.start()
|
|
|
|
# 设置为自动模式
|
|
server.set_auto_mode()
|
|
|
|
try:
|
|
while True:
|
|
server.print_status()
|
|
time.sleep(5) # 每5秒打印一次状态
|
|
except KeyboardInterrupt:
|
|
logger.info("服务器停止运行") |