You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
484 lines
18 KiB
484 lines
18 KiB
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import queue
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from tkinter import Tk, Label, Entry, Button, StringVar, END, DISABLED, NORMAL, filedialog, messagebox, simpledialog
|
|
from tkinter.scrolledtext import ScrolledText
|
|
|
|
import requests
|
|
|
|
|
|
def get_runtime_dir():
|
|
if getattr(sys, "frozen", False):
|
|
return Path(sys.executable).resolve().parent
|
|
return Path(__file__).resolve().parent
|
|
|
|
|
|
def _ensure_windows_run_registry(entry_name, exe_path):
|
|
import winreg
|
|
run_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
|
|
target_value = "\"%s\"" % exe_path
|
|
|
|
try:
|
|
with winreg.OpenKey(
|
|
winreg.HKEY_CURRENT_USER,
|
|
run_key_path,
|
|
0,
|
|
winreg.KEY_READ | winreg.KEY_SET_VALUE
|
|
) as key:
|
|
try:
|
|
old_value, _ = winreg.QueryValueEx(key, entry_name)
|
|
except FileNotFoundError:
|
|
old_value = None
|
|
|
|
if old_value and str(old_value).strip().strip("\"").lower() == exe_path.lower():
|
|
return False, "系统自启动项已存在,无需重复写入。"
|
|
|
|
winreg.SetValueEx(key, entry_name, 0, winreg.REG_SZ, target_value)
|
|
if old_value:
|
|
return True, "检测到自启动路径变化,已自动更新。"
|
|
return True, "首次启动已写入系统自启动项,下次开机会自动运行。"
|
|
except Exception as e:
|
|
return False, "写入系统自启动项失败: %s" % e
|
|
|
|
|
|
def _cleanup_legacy_startup_items(legacy_entry_name, app_data):
|
|
if app_data:
|
|
startup_dir = Path(app_data) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
|
|
legacy_script = startup_dir / (legacy_entry_name + ".vbs")
|
|
try:
|
|
if legacy_script.exists():
|
|
legacy_script.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
import winreg
|
|
run_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
|
|
with winreg.OpenKey(
|
|
winreg.HKEY_CURRENT_USER,
|
|
run_key_path,
|
|
0,
|
|
winreg.KEY_SET_VALUE
|
|
) as key:
|
|
try:
|
|
winreg.DeleteValue(key, legacy_entry_name)
|
|
except FileNotFoundError:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def ensure_windows_startup():
|
|
if os.name != "nt":
|
|
return False, "当前系统非Windows,跳过自启动注册。"
|
|
|
|
if not getattr(sys, "frozen", False):
|
|
return False, "当前为源码运行,跳过自启动注册。"
|
|
|
|
exe_path = str(Path(sys.executable).resolve())
|
|
entry_name = "QMSFileCollector"
|
|
|
|
app_data = os.getenv("APPDATA", "").strip()
|
|
_cleanup_legacy_startup_items("CKPClientFileCollector", app_data)
|
|
if app_data:
|
|
startup_dir = Path(app_data) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
|
|
startup_script = startup_dir / (entry_name + ".vbs")
|
|
script_content = (
|
|
"Set WshShell = CreateObject(\"WScript.Shell\")\n"
|
|
"WshShell.Run Chr(34) & \"%s\" & Chr(34), 0\n"
|
|
"Set WshShell = Nothing\n"
|
|
) % exe_path.replace("\"", "\"\"")
|
|
|
|
try:
|
|
startup_dir.mkdir(parents=True, exist_ok=True)
|
|
old_content = ""
|
|
if startup_script.exists():
|
|
old_content = startup_script.read_text(encoding="utf-8")
|
|
|
|
if old_content == script_content:
|
|
return False, "启动文件夹自启动项已存在,无需重复写入。"
|
|
|
|
startup_script.write_text(script_content, encoding="utf-8")
|
|
if old_content:
|
|
return True, "检测到启动文件夹自启动路径变化,已自动更新。"
|
|
return True, "首次启动已写入启动文件夹自启动项,下次开机会自动运行。"
|
|
except Exception as startup_error:
|
|
try:
|
|
changed, reg_msg = _ensure_windows_run_registry(entry_name, exe_path)
|
|
return changed, "写入启动文件夹失败,已回退注册表方式: %s" % reg_msg
|
|
except Exception as reg_error:
|
|
return False, "写入启动文件夹和注册表均失败: %s; %s" % (startup_error, reg_error)
|
|
|
|
try:
|
|
return _ensure_windows_run_registry(entry_name, exe_path)
|
|
except Exception as e:
|
|
return False, "未获取到APPDATA且注册表写入失败: %s" % e
|
|
|
|
|
|
class CollectorApp:
|
|
def __init__(self, root):
|
|
self.root = root
|
|
self.root.title("文件采集客户端")
|
|
self.root.geometry("1180x720")
|
|
self.root.resizable(True, True)
|
|
|
|
self.config_path = get_runtime_dir() / "collector_config.json"
|
|
self.log_queue = queue.Queue()
|
|
self.running = False
|
|
self.worker_thread = None
|
|
self.active_rows = []
|
|
self.upload_state = {}
|
|
self.notice_ts = {}
|
|
|
|
self.base_url_var = StringVar()
|
|
self.poll_seconds_var = StringVar(value="10")
|
|
|
|
self.row_vars = []
|
|
for _ in range(3):
|
|
self.row_vars.append({
|
|
"site": StringVar(),
|
|
"bu_no": StringVar(),
|
|
"equipment_no": StringVar(),
|
|
"source_path": StringVar(),
|
|
})
|
|
|
|
self._build_ui()
|
|
self._init_windows_startup()
|
|
self._load_config()
|
|
self.root.after(200, self._flush_logs)
|
|
|
|
def _init_windows_startup(self):
|
|
changed, message = ensure_windows_startup()
|
|
if changed:
|
|
self.log(message)
|
|
return
|
|
|
|
if getattr(sys, "frozen", False) and ("失败" in message):
|
|
self.log(message)
|
|
|
|
def _build_ui(self):
|
|
Label(self.root, text="系统地址").place(x=20, y=20)
|
|
Entry(self.root, textvariable=self.base_url_var, width=92).place(x=85, y=20)
|
|
Button(self.root, text="弹框设置地址", command=self.popup_set_base_url).place(x=860, y=16)
|
|
Button(self.root, text="保存配置", command=self.save_config).place(x=970, y=16)
|
|
|
|
Label(self.root, text="轮询秒").place(x=20, y=58)
|
|
Entry(self.root, textvariable=self.poll_seconds_var, width=10).place(x=85, y=58)
|
|
Button(self.root, text="开始同步", command=self.start_collect).place(x=180, y=54)
|
|
Button(self.root, text="停止同步", command=self.stop_collect).place(x=265, y=54)
|
|
|
|
Label(self.root, text="配置说明:最多3行,每行需填写 site + buNo + equipmentNo + 本地目录").place(x=360, y=58)
|
|
|
|
Label(self.root, text="行").place(x=20, y=100)
|
|
Label(self.root, text="Site").place(x=80, y=100)
|
|
Label(self.root, text="BuNo").place(x=230, y=100)
|
|
Label(self.root, text="EquipmentNo").place(x=380, y=100)
|
|
Label(self.root, text="本地目录").place(x=560, y=100)
|
|
|
|
for idx, row in enumerate(self.row_vars):
|
|
y = 130 + idx * 40
|
|
Label(self.root, text=str(idx + 1)).place(x=26, y=y)
|
|
Entry(self.root, textvariable=row["site"], width=16).place(x=80, y=y)
|
|
Entry(self.root, textvariable=row["bu_no"], width=16).place(x=230, y=y)
|
|
Entry(self.root, textvariable=row["equipment_no"], width=18).place(x=380, y=y)
|
|
Entry(self.root, textvariable=row["source_path"], width=56).place(x=560, y=y)
|
|
Button(self.root, text="选择目录", command=lambda x=idx: self.choose_source_dir(x)).place(x=1010, y=y - 4)
|
|
|
|
self.log_text = ScrolledText(self.root, width=165, height=23)
|
|
self.log_text.place(x=20, y=280)
|
|
self.log_text.configure(state=DISABLED)
|
|
|
|
def popup_set_base_url(self):
|
|
new_url = simpledialog.askstring(
|
|
"设置目标地址",
|
|
"请输入xujie-sys地址,例如:http://172.26.58.88:8080",
|
|
initialvalue=self.base_url_var.get().strip()
|
|
)
|
|
if new_url is not None:
|
|
self.base_url_var.set(new_url.strip())
|
|
self.log("已设置目标地址: %s" % new_url.strip())
|
|
|
|
def choose_source_dir(self, row_index):
|
|
path = filedialog.askdirectory(title="选择第%d行本地目录" % (row_index + 1))
|
|
if path:
|
|
self.row_vars[row_index]["source_path"].set(path)
|
|
|
|
def _load_config(self):
|
|
if not self.config_path.exists():
|
|
self.log("未找到本地配置,请先录入并保存。")
|
|
return
|
|
try:
|
|
config = json.loads(self.config_path.read_text(encoding="utf-8"))
|
|
self.base_url_var.set(config.get("base_url", ""))
|
|
self.poll_seconds_var.set(str(config.get("poll_seconds", "10")))
|
|
|
|
rows = config.get("rows")
|
|
# 兼容旧版本单行配置
|
|
if not isinstance(rows, list):
|
|
rows = [{
|
|
"site": config.get("site", ""),
|
|
"bu_no": config.get("bu_no", ""),
|
|
"equipment_no": config.get("equipment_no", ""),
|
|
"source_path": config.get("source_path", ""),
|
|
}]
|
|
|
|
for idx in range(min(3, len(rows))):
|
|
item = rows[idx] if isinstance(rows[idx], dict) else {}
|
|
self.row_vars[idx]["site"].set(str(item.get("site", "")).strip())
|
|
self.row_vars[idx]["bu_no"].set(str(item.get("bu_no", "")).strip())
|
|
self.row_vars[idx]["equipment_no"].set(str(item.get("equipment_no", "")).strip())
|
|
self.row_vars[idx]["source_path"].set(str(item.get("source_path", "")).strip())
|
|
|
|
self.log("配置加载完成。")
|
|
if self._can_auto_start():
|
|
self.start_collect(auto_mode=True)
|
|
except Exception as e:
|
|
self.log("读取配置失败: %s" % e)
|
|
|
|
def save_config(self):
|
|
try:
|
|
rows = self._get_valid_rows(require_complete=True, raise_on_empty=False)
|
|
except ValueError as e:
|
|
messagebox.showwarning("提示", str(e))
|
|
return
|
|
|
|
data = {
|
|
"base_url": self.base_url_var.get().strip(),
|
|
"poll_seconds": self.poll_seconds_var.get().strip(),
|
|
"rows": rows,
|
|
}
|
|
try:
|
|
self.config_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
self.log("配置已保存: %s" % self.config_path)
|
|
except Exception as e:
|
|
messagebox.showerror("错误", "保存配置失败: %s" % e)
|
|
return
|
|
|
|
if self.running:
|
|
self.active_rows = rows
|
|
self.log("采集配置已更新,下一轮轮询自动生效。")
|
|
elif self._can_auto_start():
|
|
self.start_collect(auto_mode=True)
|
|
|
|
def _can_auto_start(self):
|
|
if not self._base_url():
|
|
return False
|
|
if not self._is_poll_seconds_valid():
|
|
return False
|
|
try:
|
|
rows = self._get_valid_rows(require_complete=True, raise_on_empty=True)
|
|
return len(rows) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _is_poll_seconds_valid(self):
|
|
try:
|
|
return int(self.poll_seconds_var.get().strip()) > 0
|
|
except Exception:
|
|
return False
|
|
|
|
def _base_url(self):
|
|
return self.base_url_var.get().strip().rstrip("/")
|
|
|
|
def _get_valid_rows(self, require_complete, raise_on_empty):
|
|
rows = []
|
|
for idx, row in enumerate(self.row_vars):
|
|
site = row["site"].get().strip()
|
|
bu_no = row["bu_no"].get().strip()
|
|
equipment_no = row["equipment_no"].get().strip()
|
|
source_path = row["source_path"].get().strip()
|
|
|
|
filled_count = 0
|
|
for value in (site, bu_no, equipment_no, source_path):
|
|
if value:
|
|
filled_count += 1
|
|
|
|
if filled_count == 0:
|
|
continue
|
|
|
|
if require_complete and filled_count < 4:
|
|
raise ValueError("第%d行配置未填写完整,请补全site、buNo、equipmentNo、本地目录。" % (idx + 1))
|
|
|
|
if filled_count == 4:
|
|
rows.append({
|
|
"row_index": idx + 1,
|
|
"site": site,
|
|
"bu_no": bu_no,
|
|
"equipment_no": equipment_no,
|
|
"source_path": source_path,
|
|
})
|
|
|
|
if raise_on_empty and len(rows) == 0:
|
|
raise ValueError("请至少填写一行完整采集配置。")
|
|
return rows
|
|
|
|
def start_collect(self, auto_mode=False):
|
|
if self.running:
|
|
if not auto_mode:
|
|
self.log("采集任务已在运行中。")
|
|
return
|
|
|
|
if not self._base_url():
|
|
if auto_mode:
|
|
self.log("自动启动失败:系统地址为空。")
|
|
else:
|
|
messagebox.showwarning("提示", "请先设置系统地址")
|
|
return
|
|
|
|
try:
|
|
poll_seconds = int(self.poll_seconds_var.get().strip() or "10")
|
|
if poll_seconds <= 0:
|
|
raise ValueError("轮询秒必须大于0")
|
|
except Exception:
|
|
if auto_mode:
|
|
self.log("自动启动失败:轮询秒无效。")
|
|
else:
|
|
messagebox.showwarning("提示", "轮询秒必须是大于0的整数")
|
|
return
|
|
|
|
try:
|
|
rows = self._get_valid_rows(require_complete=True, raise_on_empty=True)
|
|
except ValueError as e:
|
|
if auto_mode:
|
|
self.log("自动启动失败:%s" % e)
|
|
else:
|
|
messagebox.showwarning("提示", str(e))
|
|
return
|
|
|
|
self.active_rows = rows
|
|
self.running = True
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self.worker_thread.start()
|
|
if auto_mode:
|
|
self.log("配置有效,已自动启动轮询同步。")
|
|
else:
|
|
self.log("采集任务已启动。")
|
|
|
|
def stop_collect(self):
|
|
if not self.running:
|
|
self.log("采集任务未运行。")
|
|
return
|
|
self.running = False
|
|
self.log("正在停止采集任务...")
|
|
|
|
def _worker_loop(self):
|
|
while self.running:
|
|
rows = list(self.active_rows)
|
|
for row in rows:
|
|
if not self.running:
|
|
break
|
|
self._sync_one_row(row)
|
|
|
|
try:
|
|
poll_seconds = int(self.poll_seconds_var.get().strip() or "10")
|
|
except Exception:
|
|
poll_seconds = 10
|
|
|
|
for _ in range(max(1, poll_seconds)):
|
|
if not self.running:
|
|
break
|
|
time.sleep(1)
|
|
self.log("采集任务已停止。")
|
|
|
|
def _sync_one_row(self, row):
|
|
row_index = row["row_index"]
|
|
source_path = row["source_path"]
|
|
path_key = "row%d_missing" % row_index
|
|
empty_key = "row%d_empty" % row_index
|
|
|
|
if not os.path.isdir(source_path):
|
|
self._log_with_interval(path_key, "第%d行目录不存在: %s" % (row_index, source_path), 60)
|
|
return
|
|
|
|
files = []
|
|
for name in os.listdir(source_path):
|
|
file_path = Path(source_path) / name
|
|
if file_path.is_file():
|
|
files.append(file_path)
|
|
|
|
if not files:
|
|
self._log_with_interval(empty_key, "第%d行目录为空,等待新文件..." % row_index, 60)
|
|
return
|
|
|
|
files.sort(key=lambda p: p.stat().st_mtime)
|
|
for file_path in files:
|
|
if not self.running:
|
|
return
|
|
signature = self._file_signature(file_path)
|
|
if signature is None:
|
|
continue
|
|
|
|
state_key = "%d|%s" % (row_index, str(file_path).lower())
|
|
if self.upload_state.get(state_key) == signature:
|
|
continue
|
|
|
|
if self._upload_file(row, file_path):
|
|
self.upload_state[state_key] = signature
|
|
|
|
def _file_signature(self, file_path):
|
|
try:
|
|
stat = file_path.stat()
|
|
return stat.st_mtime_ns, stat.st_size
|
|
except Exception as e:
|
|
self.log("读取文件状态失败: %s, 原因: %s" % (file_path, e))
|
|
return None
|
|
|
|
def _upload_file(self, row, file_path):
|
|
url = self._base_url() + "/collector/client/upload"
|
|
data = {
|
|
"site": row["site"],
|
|
"buNo": row["bu_no"],
|
|
"equipmentNo": row["equipment_no"],
|
|
}
|
|
|
|
try:
|
|
with open(file_path, "rb") as fp:
|
|
files = {"file": (file_path.name, fp, "application/octet-stream")}
|
|
resp = requests.post(url, data=data, files=files, timeout=180)
|
|
|
|
resp.raise_for_status()
|
|
result = resp.json()
|
|
if result.get("code") == 0:
|
|
response_data = result.get("data") or {}
|
|
server_path = response_data.get("savedFullPath", "")
|
|
self.log("第%d行上传成功: %s -> %s" % (row["row_index"], file_path.name, server_path))
|
|
return True
|
|
|
|
self.log("第%d行上传失败: %s, 原因: %s" % (
|
|
row["row_index"], file_path.name, result.get("msg")))
|
|
return False
|
|
except Exception as e:
|
|
self.log("第%d行上传异常: %s, 原因: %s" % (row["row_index"], file_path.name, e))
|
|
return False
|
|
|
|
def _log_with_interval(self, key, message, seconds):
|
|
now = time.time()
|
|
last = self.notice_ts.get(key, 0)
|
|
if now - last >= seconds:
|
|
self.notice_ts[key] = now
|
|
self.log(message)
|
|
|
|
def log(self, message):
|
|
self.log_queue.put("[%s] %s" % (datetime.now().strftime("%H:%M:%S"), message))
|
|
|
|
def _flush_logs(self):
|
|
while not self.log_queue.empty():
|
|
line = self.log_queue.get()
|
|
self.log_text.configure(state=NORMAL)
|
|
self.log_text.insert(END, line + "\n")
|
|
self.log_text.see(END)
|
|
self.log_text.configure(state=DISABLED)
|
|
self.root.after(200, self._flush_logs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app_root = Tk()
|
|
app = CollectorApp(app_root)
|
|
app_root.mainloop()
|