Browse Source

设备采集exe

master
han\hanst 1 week ago
parent
commit
3b69c69699
  1. 53
      client-file-collector/README.md
  2. 100
      client-file-collector/build_exe.bat
  3. 484
      client-file-collector/client_file_collector.py
  4. 2
      client-file-collector/requirements.txt

53
client-file-collector/README.md

@ -0,0 +1,53 @@
# 文件采集客户端(支持最多3行配置)
客户端按轮询秒定时将本地目录文件上传到 `xujie-sys`,服务端统一落盘到:
- `D:\ckp-file\<equipmentNo>\`
## 当前需求对应关系
- 客户端只需录入:
- 系统地址
- 轮询秒
- 最多3行配置(`site + buNo + equipmentNo + 本地目录`)
- 双击打开 EXE 后,录入并保存配置即可按轮询秒自动同步。
- 服务端收到每行配置上传的文件后,会自动写入 `D:\ckp-file\equipmentNo`
## 服务端接口
- `POST /collector/client/upload`
- form-data:
- `file`
- `site`
- `buNo`
- `equipmentNo`
## 客户端行为说明
- 每轮轮询会扫描每行配置的本地目录。
- 仅同步新增或修改过的文件(避免重复上传同一文件)。
- 不会删除本地文件。
- Windows 下首次启动 `QMSFileCollector.exe` 时,会自动写入当前用户自启动项(优先写入 `Startup` 启动文件夹,失败时回退注册表 `HKCU\Software\Microsoft\Windows\CurrentVersion\Run`)。
## 打包步骤
1. 安装 Python 3.10+。
2. 在当前目录执行:
```bash
build_exe.bat
```
3. 生成文件:
- `dist/QMSFileCollector.exe`
## 常见问题
- 找不到 `dist/QMSFileCollector.exe`
- 请先看脚本输出是否有 `Python3 interpreter not found`
- 若机器上 `python --version``2.7`,不会执行成功。
- `pip` 下载超时
- 脚本已内置镜像重试(清华 -> 阿里 -> 官方)。
- 网络不稳定时可重试执行 `build_exe.bat`

100
client-file-collector/build_exe.bat

@ -0,0 +1,100 @@
@echo off
chcp 65001 >nul
setlocal EnableDelayedExpansion
set "PY_EXE="
set "PY_ARGS="
set "PIP_INSTALL_ARGS=--disable-pip-version-check --default-timeout 180 --retries 10 --trusted-host pypi.org --trusted-host files.pythonhosted.org --trusted-host pypi.tuna.tsinghua.edu.cn --trusted-host mirrors.aliyun.com"
where py >nul 2>nul
if not errorlevel 1 (
py -3 -c "import sys;sys.exit(0 if sys.version_info[0]>=3 else 1)" >nul 2>nul
if not errorlevel 1 (
set "PY_EXE=py"
set "PY_ARGS=-3"
)
)
if not defined PY_EXE (
where python3 >nul 2>nul
if not errorlevel 1 (
python3 -c "import sys;sys.exit(0 if sys.version_info[0]>=3 else 1)" >nul 2>nul
if not errorlevel 1 (
set "PY_EXE=python3"
set "PY_ARGS="
)
)
)
if not defined PY_EXE (
python -c "import sys;sys.exit(0 if sys.version_info[0]>=3 else 1)" >nul 2>nul
if not errorlevel 1 (
set "PY_EXE=python"
set "PY_ARGS="
)
)
if not defined PY_EXE (
for %%D in ("%LocalAppData%\Programs\Python\Python312\python.exe" "%LocalAppData%\Programs\Python\Python311\python.exe" "%LocalAppData%\Programs\Python\Python310\python.exe" "C:\Program Files\Python312\python.exe" "C:\Program Files\Python311\python.exe" "C:\Program Files\Python310\python.exe") do (
if exist %%~D (
%%~D -c "import sys;sys.exit(0 if sys.version_info[0]>=3 else 1)" >nul 2>nul
if not errorlevel 1 (
set "PY_EXE=%%~D"
set "PY_ARGS="
goto :py_found
)
)
)
)
:py_found
if not defined PY_EXE (
echo Python3 interpreter not found.
echo Current python version:
python --version
echo Please install Python 3.10+ and retry.
echo Download URL https://www.python.org/downloads/windows/
exit /b 1
)
echo Using interpreter:
call "%PY_EXE%" %PY_ARGS% --version
echo [1/3] Install dependencies...
call :install_requirements
if errorlevel 1 (
echo Dependency install failed after trying all indexes
exit /b 1
)
echo [2/3] Build EXE...
if exist "dist\QMSFileCollector.exe" (
del /f /q "dist\QMSFileCollector.exe" >nul 2>nul
if exist "dist\QMSFileCollector.exe" (
echo Existing dist\QMSFileCollector.exe is locked.
echo Please close running QMSFileCollector process and retry.
exit /b 1
)
)
call "%PY_EXE%" %PY_ARGS% -m PyInstaller --noconfirm --clean --onefile --windowed --name QMSFileCollector client_file_collector.py
if errorlevel 1 (
echo EXE build failed
exit /b 1
)
echo [3/3] Build completed
echo Output: dist\QMSFileCollector.exe
endlocal
exit /b 0
:install_requirements
for %%I in (https://pypi.tuna.tsinghua.edu.cn/simple https://mirrors.aliyun.com/pypi/simple/ https://pypi.org/simple) do (
echo Trying pip index: %%I
call "%PY_EXE%" %PY_ARGS% -m pip install %PIP_INSTALL_ARGS% -i %%I -r requirements.txt
if not errorlevel 1 (
echo Dependencies installed from %%I
exit /b 0
)
echo Install failed from %%I, try next index...
)
exit /b 1

484
client-file-collector/client_file_collector.py

@ -0,0 +1,484 @@
# -*- coding: utf-8 -*-
import json
import os
import queue
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
from tkinter import Tk, Label, Entry, Button, StringVar, END, DISABLED, NORMAL, filedialog, messagebox, simpledialog
from tkinter.scrolledtext import ScrolledText
import requests
def get_runtime_dir():
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parent
def _ensure_windows_run_registry(entry_name, exe_path):
import winreg
run_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
target_value = "\"%s\"" % exe_path
try:
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
run_key_path,
0,
winreg.KEY_READ | winreg.KEY_SET_VALUE
) as key:
try:
old_value, _ = winreg.QueryValueEx(key, entry_name)
except FileNotFoundError:
old_value = None
if old_value and str(old_value).strip().strip("\"").lower() == exe_path.lower():
return False, "系统自启动项已存在,无需重复写入。"
winreg.SetValueEx(key, entry_name, 0, winreg.REG_SZ, target_value)
if old_value:
return True, "检测到自启动路径变化,已自动更新。"
return True, "首次启动已写入系统自启动项,下次开机会自动运行。"
except Exception as e:
return False, "写入系统自启动项失败: %s" % e
def _cleanup_legacy_startup_items(legacy_entry_name, app_data):
if app_data:
startup_dir = Path(app_data) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
legacy_script = startup_dir / (legacy_entry_name + ".vbs")
try:
if legacy_script.exists():
legacy_script.unlink()
except Exception:
pass
try:
import winreg
run_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
run_key_path,
0,
winreg.KEY_SET_VALUE
) as key:
try:
winreg.DeleteValue(key, legacy_entry_name)
except FileNotFoundError:
pass
except Exception:
pass
def ensure_windows_startup():
if os.name != "nt":
return False, "当前系统非Windows,跳过自启动注册。"
if not getattr(sys, "frozen", False):
return False, "当前为源码运行,跳过自启动注册。"
exe_path = str(Path(sys.executable).resolve())
entry_name = "QMSFileCollector"
app_data = os.getenv("APPDATA", "").strip()
_cleanup_legacy_startup_items("CKPClientFileCollector", app_data)
if app_data:
startup_dir = Path(app_data) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
startup_script = startup_dir / (entry_name + ".vbs")
script_content = (
"Set WshShell = CreateObject(\"WScript.Shell\")\n"
"WshShell.Run Chr(34) & \"%s\" & Chr(34), 0\n"
"Set WshShell = Nothing\n"
) % exe_path.replace("\"", "\"\"")
try:
startup_dir.mkdir(parents=True, exist_ok=True)
old_content = ""
if startup_script.exists():
old_content = startup_script.read_text(encoding="utf-8")
if old_content == script_content:
return False, "启动文件夹自启动项已存在,无需重复写入。"
startup_script.write_text(script_content, encoding="utf-8")
if old_content:
return True, "检测到启动文件夹自启动路径变化,已自动更新。"
return True, "首次启动已写入启动文件夹自启动项,下次开机会自动运行。"
except Exception as startup_error:
try:
changed, reg_msg = _ensure_windows_run_registry(entry_name, exe_path)
return changed, "写入启动文件夹失败,已回退注册表方式: %s" % reg_msg
except Exception as reg_error:
return False, "写入启动文件夹和注册表均失败: %s; %s" % (startup_error, reg_error)
try:
return _ensure_windows_run_registry(entry_name, exe_path)
except Exception as e:
return False, "未获取到APPDATA且注册表写入失败: %s" % e
class CollectorApp:
def __init__(self, root):
self.root = root
self.root.title("文件采集客户端")
self.root.geometry("1180x720")
self.root.resizable(True, True)
self.config_path = get_runtime_dir() / "collector_config.json"
self.log_queue = queue.Queue()
self.running = False
self.worker_thread = None
self.active_rows = []
self.upload_state = {}
self.notice_ts = {}
self.base_url_var = StringVar()
self.poll_seconds_var = StringVar(value="10")
self.row_vars = []
for _ in range(3):
self.row_vars.append({
"site": StringVar(),
"bu_no": StringVar(),
"equipment_no": StringVar(),
"source_path": StringVar(),
})
self._build_ui()
self._init_windows_startup()
self._load_config()
self.root.after(200, self._flush_logs)
def _init_windows_startup(self):
changed, message = ensure_windows_startup()
if changed:
self.log(message)
return
if getattr(sys, "frozen", False) and ("失败" in message):
self.log(message)
def _build_ui(self):
Label(self.root, text="系统地址").place(x=20, y=20)
Entry(self.root, textvariable=self.base_url_var, width=92).place(x=85, y=20)
Button(self.root, text="弹框设置地址", command=self.popup_set_base_url).place(x=860, y=16)
Button(self.root, text="保存配置", command=self.save_config).place(x=970, y=16)
Label(self.root, text="轮询秒").place(x=20, y=58)
Entry(self.root, textvariable=self.poll_seconds_var, width=10).place(x=85, y=58)
Button(self.root, text="开始同步", command=self.start_collect).place(x=180, y=54)
Button(self.root, text="停止同步", command=self.stop_collect).place(x=265, y=54)
Label(self.root, text="配置说明:最多3行,每行需填写 site + buNo + equipmentNo + 本地目录").place(x=360, y=58)
Label(self.root, text="").place(x=20, y=100)
Label(self.root, text="Site").place(x=80, y=100)
Label(self.root, text="BuNo").place(x=230, y=100)
Label(self.root, text="EquipmentNo").place(x=380, y=100)
Label(self.root, text="本地目录").place(x=560, y=100)
for idx, row in enumerate(self.row_vars):
y = 130 + idx * 40
Label(self.root, text=str(idx + 1)).place(x=26, y=y)
Entry(self.root, textvariable=row["site"], width=16).place(x=80, y=y)
Entry(self.root, textvariable=row["bu_no"], width=16).place(x=230, y=y)
Entry(self.root, textvariable=row["equipment_no"], width=18).place(x=380, y=y)
Entry(self.root, textvariable=row["source_path"], width=56).place(x=560, y=y)
Button(self.root, text="选择目录", command=lambda x=idx: self.choose_source_dir(x)).place(x=1010, y=y - 4)
self.log_text = ScrolledText(self.root, width=165, height=23)
self.log_text.place(x=20, y=280)
self.log_text.configure(state=DISABLED)
def popup_set_base_url(self):
new_url = simpledialog.askstring(
"设置目标地址",
"请输入xujie-sys地址,例如:http://172.26.58.88:8080",
initialvalue=self.base_url_var.get().strip()
)
if new_url is not None:
self.base_url_var.set(new_url.strip())
self.log("已设置目标地址: %s" % new_url.strip())
def choose_source_dir(self, row_index):
path = filedialog.askdirectory(title="选择第%d行本地目录" % (row_index + 1))
if path:
self.row_vars[row_index]["source_path"].set(path)
def _load_config(self):
if not self.config_path.exists():
self.log("未找到本地配置,请先录入并保存。")
return
try:
config = json.loads(self.config_path.read_text(encoding="utf-8"))
self.base_url_var.set(config.get("base_url", ""))
self.poll_seconds_var.set(str(config.get("poll_seconds", "10")))
rows = config.get("rows")
# 兼容旧版本单行配置
if not isinstance(rows, list):
rows = [{
"site": config.get("site", ""),
"bu_no": config.get("bu_no", ""),
"equipment_no": config.get("equipment_no", ""),
"source_path": config.get("source_path", ""),
}]
for idx in range(min(3, len(rows))):
item = rows[idx] if isinstance(rows[idx], dict) else {}
self.row_vars[idx]["site"].set(str(item.get("site", "")).strip())
self.row_vars[idx]["bu_no"].set(str(item.get("bu_no", "")).strip())
self.row_vars[idx]["equipment_no"].set(str(item.get("equipment_no", "")).strip())
self.row_vars[idx]["source_path"].set(str(item.get("source_path", "")).strip())
self.log("配置加载完成。")
if self._can_auto_start():
self.start_collect(auto_mode=True)
except Exception as e:
self.log("读取配置失败: %s" % e)
def save_config(self):
try:
rows = self._get_valid_rows(require_complete=True, raise_on_empty=False)
except ValueError as e:
messagebox.showwarning("提示", str(e))
return
data = {
"base_url": self.base_url_var.get().strip(),
"poll_seconds": self.poll_seconds_var.get().strip(),
"rows": rows,
}
try:
self.config_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
self.log("配置已保存: %s" % self.config_path)
except Exception as e:
messagebox.showerror("错误", "保存配置失败: %s" % e)
return
if self.running:
self.active_rows = rows
self.log("采集配置已更新,下一轮轮询自动生效。")
elif self._can_auto_start():
self.start_collect(auto_mode=True)
def _can_auto_start(self):
if not self._base_url():
return False
if not self._is_poll_seconds_valid():
return False
try:
rows = self._get_valid_rows(require_complete=True, raise_on_empty=True)
return len(rows) > 0
except Exception:
return False
def _is_poll_seconds_valid(self):
try:
return int(self.poll_seconds_var.get().strip()) > 0
except Exception:
return False
def _base_url(self):
return self.base_url_var.get().strip().rstrip("/")
def _get_valid_rows(self, require_complete, raise_on_empty):
rows = []
for idx, row in enumerate(self.row_vars):
site = row["site"].get().strip()
bu_no = row["bu_no"].get().strip()
equipment_no = row["equipment_no"].get().strip()
source_path = row["source_path"].get().strip()
filled_count = 0
for value in (site, bu_no, equipment_no, source_path):
if value:
filled_count += 1
if filled_count == 0:
continue
if require_complete and filled_count < 4:
raise ValueError("%d行配置未填写完整,请补全site、buNo、equipmentNo、本地目录。" % (idx + 1))
if filled_count == 4:
rows.append({
"row_index": idx + 1,
"site": site,
"bu_no": bu_no,
"equipment_no": equipment_no,
"source_path": source_path,
})
if raise_on_empty and len(rows) == 0:
raise ValueError("请至少填写一行完整采集配置。")
return rows
def start_collect(self, auto_mode=False):
if self.running:
if not auto_mode:
self.log("采集任务已在运行中。")
return
if not self._base_url():
if auto_mode:
self.log("自动启动失败:系统地址为空。")
else:
messagebox.showwarning("提示", "请先设置系统地址")
return
try:
poll_seconds = int(self.poll_seconds_var.get().strip() or "10")
if poll_seconds <= 0:
raise ValueError("轮询秒必须大于0")
except Exception:
if auto_mode:
self.log("自动启动失败:轮询秒无效。")
else:
messagebox.showwarning("提示", "轮询秒必须是大于0的整数")
return
try:
rows = self._get_valid_rows(require_complete=True, raise_on_empty=True)
except ValueError as e:
if auto_mode:
self.log("自动启动失败:%s" % e)
else:
messagebox.showwarning("提示", str(e))
return
self.active_rows = rows
self.running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
if auto_mode:
self.log("配置有效,已自动启动轮询同步。")
else:
self.log("采集任务已启动。")
def stop_collect(self):
if not self.running:
self.log("采集任务未运行。")
return
self.running = False
self.log("正在停止采集任务...")
def _worker_loop(self):
while self.running:
rows = list(self.active_rows)
for row in rows:
if not self.running:
break
self._sync_one_row(row)
try:
poll_seconds = int(self.poll_seconds_var.get().strip() or "10")
except Exception:
poll_seconds = 10
for _ in range(max(1, poll_seconds)):
if not self.running:
break
time.sleep(1)
self.log("采集任务已停止。")
def _sync_one_row(self, row):
row_index = row["row_index"]
source_path = row["source_path"]
path_key = "row%d_missing" % row_index
empty_key = "row%d_empty" % row_index
if not os.path.isdir(source_path):
self._log_with_interval(path_key, "%d行目录不存在: %s" % (row_index, source_path), 60)
return
files = []
for name in os.listdir(source_path):
file_path = Path(source_path) / name
if file_path.is_file():
files.append(file_path)
if not files:
self._log_with_interval(empty_key, "%d行目录为空,等待新文件..." % row_index, 60)
return
files.sort(key=lambda p: p.stat().st_mtime)
for file_path in files:
if not self.running:
return
signature = self._file_signature(file_path)
if signature is None:
continue
state_key = "%d|%s" % (row_index, str(file_path).lower())
if self.upload_state.get(state_key) == signature:
continue
if self._upload_file(row, file_path):
self.upload_state[state_key] = signature
def _file_signature(self, file_path):
try:
stat = file_path.stat()
return stat.st_mtime_ns, stat.st_size
except Exception as e:
self.log("读取文件状态失败: %s, 原因: %s" % (file_path, e))
return None
def _upload_file(self, row, file_path):
url = self._base_url() + "/collector/client/upload"
data = {
"site": row["site"],
"buNo": row["bu_no"],
"equipmentNo": row["equipment_no"],
}
try:
with open(file_path, "rb") as fp:
files = {"file": (file_path.name, fp, "application/octet-stream")}
resp = requests.post(url, data=data, files=files, timeout=180)
resp.raise_for_status()
result = resp.json()
if result.get("code") == 0:
response_data = result.get("data") or {}
server_path = response_data.get("savedFullPath", "")
self.log("%d行上传成功: %s -> %s" % (row["row_index"], file_path.name, server_path))
return True
self.log("%d行上传失败: %s, 原因: %s" % (
row["row_index"], file_path.name, result.get("msg")))
return False
except Exception as e:
self.log("%d行上传异常: %s, 原因: %s" % (row["row_index"], file_path.name, e))
return False
def _log_with_interval(self, key, message, seconds):
now = time.time()
last = self.notice_ts.get(key, 0)
if now - last >= seconds:
self.notice_ts[key] = now
self.log(message)
def log(self, message):
self.log_queue.put("[%s] %s" % (datetime.now().strftime("%H:%M:%S"), message))
def _flush_logs(self):
while not self.log_queue.empty():
line = self.log_queue.get()
self.log_text.configure(state=NORMAL)
self.log_text.insert(END, line + "\n")
self.log_text.see(END)
self.log_text.configure(state=DISABLED)
self.root.after(200, self._flush_logs)
if __name__ == "__main__":
app_root = Tk()
app = CollectorApp(app_root)
app_root.mainloop()

2
client-file-collector/requirements.txt

@ -0,0 +1,2 @@
requests
pyinstaller
Loading…
Cancel
Save