remove: 删除不必要的代码
This commit is contained in:
parent
c36189f255
commit
4db59048a6
@ -1,171 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Modbus Register Tester
|
||||
|
||||
A simple utility to test reading and writing to Modbus registers.
|
||||
This can be used to verify that the virtual Modbus server is working correctly.
|
||||
|
||||
Usage:
|
||||
python modbus_register_tester.py [--host HOST] [--port PORT] [--action {read|write}]
|
||||
[--address ADDRESS] [--value VALUE] [--count COUNT]
|
||||
|
||||
Options:
|
||||
--host HOST Modbus server host [default: localhost]
|
||||
--port PORT Modbus server port [default: 5020]
|
||||
--action {read|write} Action to perform [default: read]
|
||||
--address ADDRESS Register address to read/write [default: 1]
|
||||
--value VALUE Value to write (only for write action)
|
||||
--count COUNT Number of registers to read (only for read action) [default: 1]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def read_registers(client, address, count=1):
|
||||
"""Read holding registers
|
||||
|
||||
Args:
|
||||
client: Modbus client
|
||||
address: Register address
|
||||
count: Number of registers to read
|
||||
|
||||
Returns:
|
||||
list: Register values
|
||||
"""
|
||||
try:
|
||||
result = client.read_holding_registers(address=address, count=count)
|
||||
if result.isError():
|
||||
logger.error(f"读取寄存器D{address}失败: {result}")
|
||||
return None
|
||||
|
||||
logger.info(f"读取寄存器D{address}-D{address+count-1}成功: {result.registers}")
|
||||
return result.registers
|
||||
except Exception as e:
|
||||
logger.error(f"读取寄存器D{address}时发生错误: {str(e)}")
|
||||
return None
|
||||
|
||||
def write_register(client, address, value):
|
||||
"""Write to a holding register
|
||||
|
||||
Args:
|
||||
client: Modbus client
|
||||
address: Register address
|
||||
value: Value to write
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
result = client.write_registers(address=address, values=[value])
|
||||
if result.isError():
|
||||
logger.error(f"写入寄存器D{address}值{value}失败: {result}")
|
||||
return False
|
||||
|
||||
logger.info(f"写入寄存器D{address}值{value}成功")
|
||||
|
||||
# Verify the write by reading back the value
|
||||
read_result = read_registers(client, address)
|
||||
if read_result and read_result[0] == value:
|
||||
logger.info(f"验证写入成功: D{address} = {value}")
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"验证写入失败: D{address} 预期 {value},实际 {read_result[0] if read_result else 'unknown'}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"写入寄存器D{address}值{value}时发生错误: {str(e)}")
|
||||
return False
|
||||
|
||||
def connect_client(host, port):
|
||||
"""Connect to Modbus server
|
||||
|
||||
Args:
|
||||
host: Server host
|
||||
port: Server port
|
||||
|
||||
Returns:
|
||||
ModbusTcpClient: Connected client or None if failed
|
||||
"""
|
||||
try:
|
||||
client = ModbusTcpClient(host=host, port=port, timeout=10)
|
||||
logger.info(f"尝试连接到 Modbus 服务器 {host}:{port}")
|
||||
|
||||
is_connected = client.connect()
|
||||
if is_connected:
|
||||
logger.info(f"成功连接到 Modbus 服务器 {host}:{port}")
|
||||
return client
|
||||
else:
|
||||
logger.error(f"无法连接到 Modbus 服务器 {host}:{port}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"连接 Modbus 服务器 {host}:{port} 时发生错误: {str(e)}")
|
||||
return None
|
||||
|
||||
def close_client(client):
|
||||
"""Close Modbus client connection
|
||||
|
||||
Args:
|
||||
client: Modbus client
|
||||
"""
|
||||
if client:
|
||||
client.close()
|
||||
logger.info("Modbus 客户端连接已关闭")
|
||||
|
||||
def parse_arguments():
|
||||
"""Parse command line arguments"""
|
||||
parser = argparse.ArgumentParser(description='Modbus Register Tester')
|
||||
parser.add_argument('--host', type=str, default='localhost',
|
||||
help='Modbus server host')
|
||||
parser.add_argument('--port', type=int, default=5020,
|
||||
help='Modbus server port')
|
||||
parser.add_argument('--action', type=str, choices=['read', 'write'],
|
||||
default='read', help='Action to perform')
|
||||
parser.add_argument('--address', type=int, default=1,
|
||||
help='Register address to read/write')
|
||||
parser.add_argument('--value', type=int,
|
||||
help='Value to write (only for write action)')
|
||||
parser.add_argument('--count', type=int, default=1,
|
||||
help='Number of registers to read (only for read action)')
|
||||
return parser.parse_args()
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = parse_arguments()
|
||||
|
||||
# Validate arguments
|
||||
if args.action == 'write' and args.value is None:
|
||||
logger.error("写入操作需要指定 --value 参数")
|
||||
sys.exit(1)
|
||||
|
||||
client = None
|
||||
try:
|
||||
# Connect to the server
|
||||
client = connect_client(args.host, args.port)
|
||||
if not client:
|
||||
sys.exit(1)
|
||||
|
||||
# Perform the requested action
|
||||
if args.action == 'read':
|
||||
result = read_registers(client, args.address, args.count)
|
||||
if result is None:
|
||||
sys.exit(1)
|
||||
else: # write
|
||||
success = write_register(client, args.address, args.value)
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("操作被用户中断")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logger.error(f"发生错误: {str(e)}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
close_client(client)
|
||||
Loading…
Reference in New Issue
Block a user