readFileSystem/utils/readMTP.py

199 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
try:
import win32com.client
import pythoncom
except ImportError:
win32com = None
class MTPHandler:
def __init__(self):
self.cache_dir = os.path.join(os.getcwd(), "mtp_cache")
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
def get_latest_xml_windows(self, device_name, path_str):
"""
在 Windows MTP 设备上寻找并复制最新的 XML 文件。
"""
print(f"\n[MTP] >>> 开始同步工作: 设备={device_name}")
if win32com is None:
return None, "系统未安装 pywin32。"
try:
pythoncom.CoInitialize()
shell = win32com.client.Dispatch("Shell.Application")
computer = shell.NameSpace(17) # 此电脑
# 1. 寻找设备 (增加容错:忽略首尾空格和大小写)
device_item = None
target_name = device_name.strip().lower()
print(f"[MTP] 正在检索设备,目标名称: '{target_name}'")
items = computer.Items()
available_devices = []
for item in items:
available_devices.append(item.Name)
if item.Name.strip().lower() == target_name:
device_item = item
print(f"[MTP] 匹配成功: '{item.Name}'")
break
if not device_item:
print(f"[MTP] 未找到设备: '{device_name}'")
print(f"[MTP] 当前电脑识别到的设备列表: {available_devices}")
return None, f"未找到 MTP 设备: {device_name} (请检查名称)"
current_folder = device_item.GetFolder
# 2. 深入路径
path_chain = [p.strip() for p in path_str.replace('\\', '/').split('/') if p.strip()]
for folder_name in path_chain:
found = False
for item in current_folder.Items():
if item.Name.lower() == folder_name.lower() and item.IsFolder:
current_folder = item.GetFolder
found = True
break
if not found:
print(f"[MTP] 路径中断,找不到文件夹: '{folder_name}'")
# 打印一下当前目录下有的东西,帮用户排查
content = [i.Name for i in current_folder.Items()]
print(f"[MTP] 当前所在位置 [ {current_folder.Title} ] 下的可选内容: {content}")
return None, f"路径点不存在: {folder_name}"
# 3. 扫描文件并识别“最新”
print(f"[MTP] 已到达目标目录: [ {current_folder.Title} ]")
print(f"[MTP] 正在列出该目录下所有项目...")
print("-" * 40)
xml_items = []
all_items = current_folder.Items()
print(f"[MTP] 目录内总项数: {all_items.Count}")
for i in range(all_items.Count):
try:
item = all_items.Item(i)
name = str(item.Name)
is_folder = item.IsFolder
# 打印每一个文件的详细信息用于排查
# 尝试打印前 6 个详情列,看看哪一列是类型,哪一列是时间
details = [str(current_folder.GetDetailsOf(item, j)) for j in range(6)]
if i == 0: print(f"[MTP] 属性列参考(0-5): {details}")
item_type = details[2]
# 识别逻辑:
# 1. 文件名明确以 .xml 结尾
# 2. 或者类型描述中包含 "XML" (遍历 2, 3, 4 列)
# 3. 或者针对您的 PDA 特效:文件名以 "Result_" 开头且名字里没有点 (认为后缀被隐藏了)
is_xml_name = name.lower().endswith(".xml")
is_xml_type = any("xml" in details[j].lower() for j in [2, 3, 4, 5])
is_pda_result = name.startswith("Result_") and "." not in name
if not is_folder and (is_xml_name or is_xml_type or is_pda_result):
# 尝试获取最可信的时间
mtime = self._get_best_time(current_folder, item)
# 如果文件名包含 Result_YYYYMMDD_HHMMSS提取它作为辅助排序依据
name_ts = self._extract_time_from_name(name)
xml_items.append({
'item': item,
'name': name if is_xml_name else f"{name}.xml",
'time': mtime,
'name_ts': name_ts
})
print(f"[MTP] 识别 XML: '{name}' | 创建/修改时间: {mtime} | 文件名时间: {name_ts}")
else:
pass
except Exception as e:
print(f"[MTP] 读取第 {i} 个项目时出错: {e}")
print("-" * 40)
if not xml_items:
print(f"[MTP] 报错: 在目录 [ {current_folder.Title} ] 中没有找到任何以 .xml 结尾的文件")
return None, "该目录中没有 XML 文件"
# 排序逻辑优化:
# 1. 优先按照文件名里的时间戳排序 (Result_YYYYMMDD_HHMMSS)
# 2. 其次按照系统返回的时间属性排序
# 3. 最后按全名排序
xml_items.sort(key=lambda x: (x['name_ts'], str(x['time']), x['name']), reverse=True)
latest = xml_items[0]
latest_item = latest['item']
filename = latest['name']
print(f"[MTP] 选定最新文件: {filename}")
print(f"[MTP] - 属性时间: {latest['time']}")
print(f"[MTP] - 名称日期: {latest['name_ts']}")
# 4. 复制到本地
local_path = os.path.abspath(os.path.join(self.cache_dir, filename))
# 清理旧缓存
for f in os.listdir(self.cache_dir):
try: os.remove(os.path.join(self.cache_dir, f))
except: pass
print(f"[MTP] 正在拉取文件到本地缓存...")
dest_shell_folder = shell.NameSpace(os.path.abspath(self.cache_dir))
dest_shell_folder.CopyHere(latest_item, 4 | 16)
# 等待
success = False
for i in range(20):
if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
success = True
break
time.sleep(0.5)
if success:
print(f"[MTP] ✨ 同步成功: {filename}")
return local_path, filename
else:
return None, "同步超时"
except Exception as e:
print(f"[MTP] 异常: {e}")
return None, str(e)
finally:
pythoncom.CoUninitialize()
def _extract_time_from_name(self, name):
"""从文件名 Result_20251222_124145 中提取日期字符串用于排序"""
import re
# 匹配 8位数字_6位数字 (YYYYMMDD_HHMMSS)
match = re.search(r'(\d{8}_\d{6})', name)
if match:
return match.group(1)
# 如果没匹配到,试试只匹配 8位日期
match = re.search(r'(\d{8})', name)
if match:
return match.group(1)
return ""
def _get_best_time(self, folder_obj, item_obj):
"""尝试从 Shell 属性中获取文件时间"""
# 索引 3 通常是修改时间,索引 4 通常是创建时间
for idx in [3, 4, 5]:
try:
t = folder_obj.GetDetailsOf(item_obj, idx)
if t and len(t) > 5: # 简单过滤无效短字符串
return t
except: pass
# 兜底尝试标准属性
try:
return str(item_obj.ModifyDate)
except: return ""
def clear_cache(self):
if os.path.exists(self.cache_dir):
import shutil
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir)