|
|
@ -134,6 +134,7 @@ class CollectorApp: |
|
|
self.log_queue = queue.Queue() |
|
|
self.log_queue = queue.Queue() |
|
|
self.running = False |
|
|
self.running = False |
|
|
self.worker_thread = None |
|
|
self.worker_thread = None |
|
|
|
|
|
self.stop_event = threading.Event() |
|
|
self.active_rows = [] |
|
|
self.active_rows = [] |
|
|
self.upload_state = {} |
|
|
self.upload_state = {} |
|
|
self.notice_ts = {} |
|
|
self.notice_ts = {} |
|
|
@ -392,8 +393,12 @@ class CollectorApp: |
|
|
return |
|
|
return |
|
|
|
|
|
|
|
|
self.active_rows = rows |
|
|
self.active_rows = rows |
|
|
|
|
|
# 每次手动/自动启动都重置一次状态,避免停启后误判“已同步”。 |
|
|
|
|
|
self.upload_state = {} |
|
|
|
|
|
self.notice_ts = {} |
|
|
|
|
|
self.stop_event = threading.Event() |
|
|
self.running = True |
|
|
self.running = True |
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) |
|
|
|
|
|
|
|
|
self.worker_thread = threading.Thread(target=self._worker_loop, args=(self.stop_event,), daemon=True) |
|
|
self.worker_thread.start() |
|
|
self.worker_thread.start() |
|
|
if auto_mode: |
|
|
if auto_mode: |
|
|
self.log("配置有效,已自动启动轮询同步。") |
|
|
self.log("配置有效,已自动启动轮询同步。") |
|
|
@ -405,15 +410,16 @@ class CollectorApp: |
|
|
self.log("采集任务未运行。") |
|
|
self.log("采集任务未运行。") |
|
|
return |
|
|
return |
|
|
self.running = False |
|
|
self.running = False |
|
|
|
|
|
self.stop_event.set() |
|
|
self.log("正在停止采集任务...") |
|
|
self.log("正在停止采集任务...") |
|
|
|
|
|
|
|
|
def _worker_loop(self): |
|
|
|
|
|
while self.running: |
|
|
|
|
|
|
|
|
def _worker_loop(self, stop_event): |
|
|
|
|
|
while not stop_event.is_set(): |
|
|
rows = list(self.active_rows) |
|
|
rows = list(self.active_rows) |
|
|
for row in rows: |
|
|
for row in rows: |
|
|
if not self.running: |
|
|
|
|
|
|
|
|
if stop_event.is_set(): |
|
|
break |
|
|
break |
|
|
self._sync_one_row(row) |
|
|
|
|
|
|
|
|
self._sync_one_row(row, stop_event) |
|
|
|
|
|
|
|
|
try: |
|
|
try: |
|
|
poll_seconds = int(self.poll_seconds_var.get().strip() or "10") |
|
|
poll_seconds = int(self.poll_seconds_var.get().strip() or "10") |
|
|
@ -421,12 +427,15 @@ class CollectorApp: |
|
|
poll_seconds = 10 |
|
|
poll_seconds = 10 |
|
|
|
|
|
|
|
|
for _ in range(max(1, poll_seconds)): |
|
|
for _ in range(max(1, poll_seconds)): |
|
|
if not self.running: |
|
|
|
|
|
|
|
|
if stop_event.is_set(): |
|
|
break |
|
|
break |
|
|
time.sleep(1) |
|
|
time.sleep(1) |
|
|
self.log("采集任务已停止。") |
|
|
|
|
|
|
|
|
if self.stop_event is stop_event: |
|
|
|
|
|
self.running = False |
|
|
|
|
|
self.worker_thread = None |
|
|
|
|
|
self.log("采集任务已停止。") |
|
|
|
|
|
|
|
|
def _sync_one_row(self, row): |
|
|
|
|
|
|
|
|
def _sync_one_row(self, row, stop_event): |
|
|
row_index = row["row_index"] |
|
|
row_index = row["row_index"] |
|
|
source_path = row["source_path"] |
|
|
source_path = row["source_path"] |
|
|
path_key = "row%d_missing" % row_index |
|
|
path_key = "row%d_missing" % row_index |
|
|
@ -442,13 +451,14 @@ class CollectorApp: |
|
|
if file_path.is_file(): |
|
|
if file_path.is_file(): |
|
|
files.append(file_path) |
|
|
files.append(file_path) |
|
|
|
|
|
|
|
|
|
|
|
self._cleanup_row_upload_state(row_index, files) |
|
|
if not files: |
|
|
if not files: |
|
|
self._log_with_interval(empty_key, "第%d行目录为空,等待新文件..." % row_index, 60) |
|
|
self._log_with_interval(empty_key, "第%d行目录为空,等待新文件..." % row_index, 60) |
|
|
return |
|
|
return |
|
|
|
|
|
|
|
|
files.sort(key=lambda p: p.stat().st_mtime) |
|
|
files.sort(key=lambda p: p.stat().st_mtime) |
|
|
for file_path in files: |
|
|
for file_path in files: |
|
|
if not self.running: |
|
|
|
|
|
|
|
|
if stop_event.is_set(): |
|
|
return |
|
|
return |
|
|
signature = self._file_signature(file_path) |
|
|
signature = self._file_signature(file_path) |
|
|
if signature is None: |
|
|
if signature is None: |
|
|
@ -460,10 +470,25 @@ class CollectorApp: |
|
|
|
|
|
|
|
|
if self._upload_file(row, file_path): |
|
|
if self._upload_file(row, file_path): |
|
|
if self._backup_and_remove_file(row, file_path): |
|
|
if self._backup_and_remove_file(row, file_path): |
|
|
self.upload_state[state_key] = signature |
|
|
|
|
|
|
|
|
# 文件已从源目录移走,立即清理状态,允许后续同名新文件再次同步。 |
|
|
|
|
|
self.upload_state.pop(state_key, None) |
|
|
else: |
|
|
else: |
|
|
self.upload_state.pop(state_key, None) |
|
|
self.upload_state.pop(state_key, None) |
|
|
|
|
|
|
|
|
|
|
|
def _cleanup_row_upload_state(self, row_index, files): |
|
|
|
|
|
prefix = "%d|" % row_index |
|
|
|
|
|
current_keys = set() |
|
|
|
|
|
for file_path in files: |
|
|
|
|
|
current_keys.add("%d|%s" % (row_index, str(file_path).lower())) |
|
|
|
|
|
|
|
|
|
|
|
stale_keys = [] |
|
|
|
|
|
for key in self.upload_state.keys(): |
|
|
|
|
|
if key.startswith(prefix) and key not in current_keys: |
|
|
|
|
|
stale_keys.append(key) |
|
|
|
|
|
|
|
|
|
|
|
for key in stale_keys: |
|
|
|
|
|
self.upload_state.pop(key, None) |
|
|
|
|
|
|
|
|
def _file_signature(self, file_path): |
|
|
def _file_signature(self, file_path): |
|
|
try: |
|
|
try: |
|
|
stat = file_path.stat() |
|
|
stat = file_path.stat() |
|
|
|