199 lines
8.3 KiB
Python
199 lines
8.3 KiB
Python
import os
|
||
import time
|
||
|
||
try:
|
||
import win32com.client
|
||
import pythoncom
|
||
except ImportError:
|
||
win32com = None
|
||
|
||
class MTPHandler:
|
||
|
||
def __init__(self):
|
||
self.cache_dir = os.path.join(os.getcwd(), "mtp_cache")
|
||
if not os.path.exists(self.cache_dir):
|
||
os.makedirs(self.cache_dir)
|
||
|
||
def get_latest_xml_windows(self, device_name, path_str):
|
||
"""
|
||
在 Windows MTP 设备上寻找并复制最新的 XML 文件。
|
||
"""
|
||
print(f"\n[MTP] >>> 开始同步工作: 设备={device_name}")
|
||
if win32com is None:
|
||
return None, "系统未安装 pywin32。"
|
||
|
||
try:
|
||
pythoncom.CoInitialize()
|
||
shell = win32com.client.Dispatch("Shell.Application")
|
||
computer = shell.NameSpace(17) # 此电脑
|
||
|
||
# 1. 寻找设备 (增加容错:忽略首尾空格和大小写)
|
||
device_item = None
|
||
target_name = device_name.strip().lower()
|
||
|
||
print(f"[MTP] 正在检索设备,目标名称: '{target_name}'")
|
||
items = computer.Items()
|
||
available_devices = []
|
||
|
||
for item in items:
|
||
available_devices.append(item.Name)
|
||
if item.Name.strip().lower() == target_name:
|
||
device_item = item
|
||
print(f"[MTP] 匹配成功: '{item.Name}'")
|
||
break
|
||
|
||
if not device_item:
|
||
print(f"[MTP] 未找到设备: '{device_name}'")
|
||
print(f"[MTP] 当前电脑识别到的设备列表: {available_devices}")
|
||
return None, f"未找到 MTP 设备: {device_name} (请检查名称)"
|
||
|
||
current_folder = device_item.GetFolder
|
||
|
||
# 2. 深入路径
|
||
path_chain = [p.strip() for p in path_str.replace('\\', '/').split('/') if p.strip()]
|
||
for folder_name in path_chain:
|
||
found = False
|
||
for item in current_folder.Items():
|
||
if item.Name.lower() == folder_name.lower() and item.IsFolder:
|
||
current_folder = item.GetFolder
|
||
found = True
|
||
break
|
||
if not found:
|
||
print(f"[MTP] 路径中断,找不到文件夹: '{folder_name}'")
|
||
# 打印一下当前目录下有的东西,帮用户排查
|
||
content = [i.Name for i in current_folder.Items()]
|
||
print(f"[MTP] 当前所在位置 [ {current_folder.Title} ] 下的可选内容: {content}")
|
||
return None, f"路径点不存在: {folder_name}"
|
||
|
||
# 3. 扫描文件并识别“最新”
|
||
print(f"[MTP] 已到达目标目录: [ {current_folder.Title} ]")
|
||
print(f"[MTP] 正在列出该目录下所有项目...")
|
||
print("-" * 40)
|
||
|
||
xml_items = []
|
||
all_items = current_folder.Items()
|
||
print(f"[MTP] 目录内总项数: {all_items.Count}")
|
||
|
||
for i in range(all_items.Count):
|
||
try:
|
||
item = all_items.Item(i)
|
||
name = str(item.Name)
|
||
is_folder = item.IsFolder
|
||
|
||
# 打印每一个文件的详细信息用于排查
|
||
# 尝试打印前 6 个详情列,看看哪一列是类型,哪一列是时间
|
||
details = [str(current_folder.GetDetailsOf(item, j)) for j in range(6)]
|
||
if i == 0: print(f"[MTP] 属性列参考(0-5): {details}")
|
||
|
||
item_type = details[2]
|
||
# 识别逻辑:
|
||
# 1. 文件名明确以 .xml 结尾
|
||
# 2. 或者类型描述中包含 "XML" (遍历 2, 3, 4 列)
|
||
# 3. 或者针对您的 PDA 特效:文件名以 "Result_" 开头且名字里没有点 (认为后缀被隐藏了)
|
||
is_xml_name = name.lower().endswith(".xml")
|
||
is_xml_type = any("xml" in details[j].lower() for j in [2, 3, 4, 5])
|
||
is_pda_result = name.startswith("Result_") and "." not in name
|
||
|
||
if not is_folder and (is_xml_name or is_xml_type or is_pda_result):
|
||
# 尝试获取最可信的时间
|
||
mtime = self._get_best_time(current_folder, item)
|
||
# 如果文件名包含 Result_YYYYMMDD_HHMMSS,提取它作为辅助排序依据
|
||
name_ts = self._extract_time_from_name(name)
|
||
|
||
xml_items.append({
|
||
'item': item,
|
||
'name': name if is_xml_name else f"{name}.xml",
|
||
'time': mtime,
|
||
'name_ts': name_ts
|
||
})
|
||
print(f"[MTP] 识别 XML: '{name}' | 创建/修改时间: {mtime} | 文件名时间: {name_ts}")
|
||
else:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[MTP] 读取第 {i} 个项目时出错: {e}")
|
||
|
||
print("-" * 40)
|
||
|
||
if not xml_items:
|
||
print(f"[MTP] 报错: 在目录 [ {current_folder.Title} ] 中没有找到任何以 .xml 结尾的文件")
|
||
return None, "该目录中没有 XML 文件"
|
||
|
||
# 排序逻辑优化:
|
||
# 1. 优先按照文件名里的时间戳排序 (Result_YYYYMMDD_HHMMSS)
|
||
# 2. 其次按照系统返回的时间属性排序
|
||
# 3. 最后按全名排序
|
||
xml_items.sort(key=lambda x: (x['name_ts'], str(x['time']), x['name']), reverse=True)
|
||
|
||
latest = xml_items[0]
|
||
latest_item = latest['item']
|
||
filename = latest['name']
|
||
|
||
print(f"[MTP] 选定最新文件: {filename}")
|
||
print(f"[MTP] - 属性时间: {latest['time']}")
|
||
print(f"[MTP] - 名称日期: {latest['name_ts']}")
|
||
|
||
# 4. 复制到本地
|
||
local_path = os.path.abspath(os.path.join(self.cache_dir, filename))
|
||
|
||
# 清理旧缓存
|
||
for f in os.listdir(self.cache_dir):
|
||
try: os.remove(os.path.join(self.cache_dir, f))
|
||
except: pass
|
||
|
||
print(f"[MTP] 正在拉取文件到本地缓存...")
|
||
dest_shell_folder = shell.NameSpace(os.path.abspath(self.cache_dir))
|
||
dest_shell_folder.CopyHere(latest_item, 4 | 16)
|
||
|
||
# 等待
|
||
success = False
|
||
for i in range(20):
|
||
if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
|
||
success = True
|
||
break
|
||
time.sleep(0.5)
|
||
|
||
if success:
|
||
print(f"[MTP] ✨ 同步成功: {filename}")
|
||
return local_path, filename
|
||
else:
|
||
return None, "同步超时"
|
||
|
||
except Exception as e:
|
||
print(f"[MTP] 异常: {e}")
|
||
return None, str(e)
|
||
finally:
|
||
pythoncom.CoUninitialize()
|
||
|
||
def _extract_time_from_name(self, name):
|
||
"""从文件名 Result_20251222_124145 中提取日期字符串用于排序"""
|
||
import re
|
||
# 匹配 8位数字_6位数字 (YYYYMMDD_HHMMSS)
|
||
match = re.search(r'(\d{8}_\d{6})', name)
|
||
if match:
|
||
return match.group(1)
|
||
# 如果没匹配到,试试只匹配 8位日期
|
||
match = re.search(r'(\d{8})', name)
|
||
if match:
|
||
return match.group(1)
|
||
return ""
|
||
|
||
def _get_best_time(self, folder_obj, item_obj):
|
||
"""尝试从 Shell 属性中获取文件时间"""
|
||
# 索引 3 通常是修改时间,索引 4 通常是创建时间
|
||
for idx in [3, 4, 5]:
|
||
try:
|
||
t = folder_obj.GetDetailsOf(item_obj, idx)
|
||
if t and len(t) > 5: # 简单过滤无效短字符串
|
||
return t
|
||
except: pass
|
||
|
||
# 兜底尝试标准属性
|
||
try:
|
||
return str(item_obj.ModifyDate)
|
||
except: return ""
|
||
|
||
def clear_cache(self):
|
||
if os.path.exists(self.cache_dir):
|
||
import shutil
|
||
shutil.rmtree(self.cache_dir)
|
||
os.makedirs(self.cache_dir) |