jiateng_ws/modbus_register_tester.py

171 lines
5.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Modbus Register Tester
A simple utility to test reading and writing to Modbus registers.
This can be used to verify that the virtual Modbus server is working correctly.
Usage:
python modbus_register_tester.py [--host HOST] [--port PORT] [--action {read|write}]
[--address ADDRESS] [--value VALUE] [--count COUNT]
Options:
--host HOST Modbus server host [default: localhost]
--port PORT Modbus server port [default: 5020]
--action {read|write} Action to perform [default: read]
--address ADDRESS Register address to read/write [default: 1]
--value VALUE Value to write (only for write action)
--count COUNT Number of registers to read (only for read action) [default: 1]
"""
import argparse
import logging
import sys
from pymodbus.client import ModbusTcpClient
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
def read_registers(client, address, count=1):
"""Read holding registers
Args:
client: Modbus client
address: Register address
count: Number of registers to read
Returns:
list: Register values
"""
try:
result = client.read_holding_registers(address=address, count=count)
if result.isError():
logger.error(f"读取寄存器D{address}失败: {result}")
return None
logger.info(f"读取寄存器D{address}-D{address+count-1}成功: {result.registers}")
return result.registers
except Exception as e:
logger.error(f"读取寄存器D{address}时发生错误: {str(e)}")
return None
def write_register(client, address, value):
"""Write to a holding register
Args:
client: Modbus client
address: Register address
value: Value to write
Returns:
bool: True if successful, False otherwise
"""
try:
result = client.write_registers(address=address, values=[value])
if result.isError():
logger.error(f"写入寄存器D{address}{value}失败: {result}")
return False
logger.info(f"写入寄存器D{address}{value}成功")
# Verify the write by reading back the value
read_result = read_registers(client, address)
if read_result and read_result[0] == value:
logger.info(f"验证写入成功: D{address} = {value}")
return True
else:
logger.warning(f"验证写入失败: D{address} 预期 {value},实际 {read_result[0] if read_result else 'unknown'}")
return False
except Exception as e:
logger.error(f"写入寄存器D{address}{value}时发生错误: {str(e)}")
return False
def connect_client(host, port):
"""Connect to Modbus server
Args:
host: Server host
port: Server port
Returns:
ModbusTcpClient: Connected client or None if failed
"""
try:
client = ModbusTcpClient(host=host, port=port, timeout=10)
logger.info(f"尝试连接到 Modbus 服务器 {host}:{port}")
is_connected = client.connect()
if is_connected:
logger.info(f"成功连接到 Modbus 服务器 {host}:{port}")
return client
else:
logger.error(f"无法连接到 Modbus 服务器 {host}:{port}")
return None
except Exception as e:
logger.error(f"连接 Modbus 服务器 {host}:{port} 时发生错误: {str(e)}")
return None
def close_client(client):
"""Close Modbus client connection
Args:
client: Modbus client
"""
if client:
client.close()
logger.info("Modbus 客户端连接已关闭")
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Modbus Register Tester')
parser.add_argument('--host', type=str, default='localhost',
help='Modbus server host')
parser.add_argument('--port', type=int, default=5020,
help='Modbus server port')
parser.add_argument('--action', type=str, choices=['read', 'write'],
default='read', help='Action to perform')
parser.add_argument('--address', type=int, default=1,
help='Register address to read/write')
parser.add_argument('--value', type=int,
help='Value to write (only for write action)')
parser.add_argument('--count', type=int, default=1,
help='Number of registers to read (only for read action)')
return parser.parse_args()
if __name__ == '__main__':
args = parse_arguments()
# Validate arguments
if args.action == 'write' and args.value is None:
logger.error("写入操作需要指定 --value 参数")
sys.exit(1)
client = None
try:
# Connect to the server
client = connect_client(args.host, args.port)
if not client:
sys.exit(1)
# Perform the requested action
if args.action == 'read':
result = read_registers(client, args.address, args.count)
if result is None:
sys.exit(1)
else: # write
success = write_register(client, args.address, args.value)
if not success:
sys.exit(1)
except KeyboardInterrupt:
logger.info("操作被用户中断")
sys.exit(0)
except Exception as e:
logger.error(f"发生错误: {str(e)}")
sys.exit(1)
finally:
close_client(client)