From eef5d0b0bb663755d70b1feafbef8a9936549cbb Mon Sep 17 00:00:00 2001 From: "dr.holy" Date: Sat, 6 Sep 2025 00:34:35 +0700 Subject: [PATCH] init --- LICENSE | 21 ++ README.md | 49 +++ main.py | 972 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1042 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 main.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d29ae8e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Valentine Rymskiy + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..605681c --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# Выгрузка видеорегистратора прикладное ПО + +Выгрузка видеорегистратора (Windows, Tkinter) + +Апдейт интерфейса и логики: +- Только съёмные носители (USB/SD) во всех операциях (сканирование/генерация/форматирование). +- Жёлтый бар при обнаружении карты с обратным отсчётом 20с. +- Текст поверх прогресс-бара (оверлей), перенос по ширине, не уходит за края. +- Нижний статус также внутри полосы прогресса. +- Повторное втыкание карты: если есть .uploaded_ok, копируем снова ТОЛЬКО если появились новые файлы после метки. +- После автоформата надпись «ВЫПОЛНЕНО!» в баре. +- Колонка «Статус» в таблице (Копирование успешно…, Ошибка…, Пустая SD карта). +- Окна одиночные (Настройки/Форматирование/Генерация). +- Защита от дублей и проверка целостности. + +Требуется Windows, Python 3.10+, для форматирования — запуск от администратора. + +## Сборка исполняемого файла + +### Переход в директорию проекта +```bash +cd C:\Users\syava\OneDrive\Документы\gundone\mht_exporter_full_updated.py +``` + +### Создание виртуального окружения +```bash +python -m venv venv +``` + +### Активация виртуального окружения +```bash +.\venv\Scripts\activate +``` + +### Установка PyInstaller +```bash +pip install pyinstaller +``` + +### Создание исполняемого файла +```bash +pyinstaller -F -w --uac-admin mht_exporter_full_updated.py +``` + +### Параметры сборки: +- `-F` — создать один исполняемый файл +- `-w` — без консольного окна (GUI приложение) +- `--uac-admin` — запрашивать права администратора при запуске +``` \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..69c38fe --- /dev/null +++ b/main.py @@ -0,0 +1,972 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys +import json +import shutil +import subprocess +import datetime +import string +import csv +import threading +import queue +import time +from dataclasses import dataclass, asdict, field +from typing import List, Optional, Dict, Tuple + +import tkinter as tk +from tkinter import ttk, messagebox, filedialog, simpledialog + +APP_TITLE = "МХТ - Выгрузка видеорегистратора" +CONFIG_NAME = "mht_sd_manager.config.json" +LOG_NAME = "mht_fleet_log.csv" +FLEET_ID_FILENAME = ".fleet_id" +UPLOADED_MARK = ".uploaded_ok" + +# ========================== Модель ========================== + +@dataclass +class Car: + plate: str + custom_path: Optional[str] = None + +@dataclass +class Settings: + cars: List[Car] = field(default_factory=list) + continuous_scan: bool = False + auto_format_after_copy: bool = False + auto_format_fs: str = "FAT32" # FAT32 / exFAT / NTFS + use_date_folder: bool = True # «Актуальная дата загрузки» (MM-DD-YYYY) + + def to_json(self) -> dict: + return { + "cars": [asdict(c) for c in self.cars], + "continuous_scan": self.continuous_scan, + "auto_format_after_copy": self.auto_format_after_copy, + "auto_format_fs": self.auto_format_fs, + "use_date_folder": self.use_date_folder, + } + + @staticmethod + def from_json(d: dict) -> "Settings": + s = Settings() + s.cars = [Car(**c) for c in d.get("cars", [])] + s.continuous_scan = bool(d.get("continuous_scan", False)) + s.auto_format_after_copy = bool(d.get("auto_format_after_copy", False)) + s.auto_format_fs = d.get("auto_format_fs", "FAT32") + s.use_date_folder = bool(d.get("use_date_folder", True)) + return s + +# ========================== Утилиты ========================== + +def app_dir() -> str: + return os.path.dirname(os.path.abspath(sys.argv[0])) + +def config_path() -> str: + return os.path.join(app_dir(), CONFIG_NAME) + +def log_path() -> str: + return os.path.join(app_dir(), LOG_NAME) + +def load_settings() -> Settings: + p = config_path() + if os.path.exists(p): + try: + with open(p, "r", encoding="utf-8") as f: + return Settings.from_json(json.load(f)) + except Exception: + pass + return Settings() + +def save_settings(s: Settings): + try: + with open(config_path(), "w", encoding="utf-8") as f: + json.dump(s.to_json(), f, ensure_ascii=False, indent=2) + except Exception as e: + messagebox.showerror("Ошибка", f"Не удалось сохранить настройки: {e}") + +def ensure_dir(path: str): + os.makedirs(path, exist_ok=True) + +def human_size(bytesize: int) -> str: + val = float(bytesize) + for unit in ["B", "KB", "MB", "GB", "TB"]: + if val < 1024 or unit == "TB": + return f"{val:.2f} {unit}" + val /= 1024.0 + +def only_digits(s: str) -> str: + return "".join(ch for ch in s if ch.isdigit()) + +# ---------- Лог ---------- + +def append_log(row: dict): + path = log_path() + need_header = not os.path.exists(path) + try: + with open(path, "a", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=list(row.keys())) + if need_header: + w.writeheader() + w.writerow(row) + except Exception as e: + print(f"Log write failed: {e}") + +# ========================== Windows: диски/метки/формат ========================== + +def list_removable_drives() -> List[str]: + """ + Возвращает только съёмные диски (USB/SD) по буквам. Без фолбэков на все буквы. + """ + letters: List[str] = [] + if os.name != "nt": + return letters + try: + out = subprocess.check_output( + ["powershell", "-NoProfile", "-Command", + "(Get-Volume | Where-Object { $_.DriveType -eq 'Removable' -and $_.DriveLetter }) | " + "Select-Object -ExpandProperty DriveLetter"], + creationflags=subprocess.CREATE_NO_WINDOW + ).decode("utf-8", errors="ignore") + for line in out.splitlines(): + d = line.strip() + if len(d) == 1 and d.isalpha(): + letters.append(f"{d.upper()}:") + except Exception: + pass + # Доп. страховка WMIC + try: + out = subprocess.check_output( + ["wmic", "logicaldisk", "where", "DriveType=2", "get", "DeviceID"], + creationflags=subprocess.CREATE_NO_WINDOW + ).decode("utf-8", errors="ignore") + for line in out.splitlines(): + line = line.strip() + if len(line) == 2 and line.endswith(":") and line not in letters: + letters.append(line) + except Exception: + pass + return letters + +def get_volume_label(letter: str) -> Optional[str]: + try: + out = subprocess.check_output( + ["powershell", "-NoProfile", "-Command", f"(Get-Volume -DriveLetter '{letter[0]}').FileSystemLabel"], + creationflags=subprocess.CREATE_NO_WINDOW + ).decode("utf-8", errors="ignore").strip() + return out if out else None + except Exception: + return None + +def set_volume_label(letter: str, label: str) -> bool: + try: + subprocess.check_call( + ["powershell", "-NoProfile", "-Command", f"Set-Volume -DriveLetter '{letter[0]}' -NewFileSystemLabel '{label}'"], + creationflags=subprocess.CREATE_NO_WINDOW + ) + return True + except Exception as e: + messagebox.showerror("Ошибка", f"Не удалось установить метку тома: {e}\nЗапустите программу от имени администратора.") + return False + +def format_volume(letter: str, fs: str, label: str) -> bool: + try: + if fs.upper() == "FAT32": + cmd = f"Format-Volume -DriveLetter '{letter[0]}' -FileSystem FAT32 -NewFileSystemLabel '{label}' -Confirm:$false -Force" + elif fs.upper() == "EXFAT": + cmd = f"Format-Volume -DriveLetter '{letter[0]}' -FileSystem exFAT -NewFileSystemLabel '{label}' -Confirm:$false -Force" + else: + cmd = f"Format-Volume -DriveLetter '{letter[0]}' -FileSystem NTFS -NewFileSystemLabel '{label}' -Confirm:$false -Force" + subprocess.check_call(["powershell", "-NoProfile", "-Command", cmd], creationflags=subprocess.CREATE_NO_WINDOW) + return True + except Exception as e: + messagebox.showerror("Ошибка", f"Форматирование не удалось: {e}\nЗапустите от имени администратора.") + return False + +# =============== .fleet_id / uploaded mark =============== + +def fleet_id_path(letter: str) -> str: + return os.path.join(f"{letter}\\", FLEET_ID_FILENAME) + +def uploaded_mark_path(letter: str) -> str: + return os.path.join(f"{letter}\\", UPLOADED_MARK) + +def write_fleet_id(letter: str, car: Car) -> bool: + data = {"plate": car.plate, "digits": only_digits(car.plate), "assigned_at": datetime.datetime.now().isoformat()} + p = fleet_id_path(letter) + try: + with open(p, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False) + try: + subprocess.check_call(["attrib", "+h", p], creationflags=subprocess.CREATE_NO_WINDOW) + except Exception: + pass + return True + except Exception as e: + messagebox.showerror("Ошибка", f"Не удалось записать {FLEET_ID_FILENAME}: {e}") + return False + +def read_fleet_id(letter: str) -> Optional[dict]: + p = fleet_id_path(letter) + if not os.path.exists(p): + return None + try: + with open(p, "r", encoding="utf-8") as f: + return json.load(f) + except Exception: + return None + +def set_uploaded_mark(letter: str): + try: + p = uploaded_mark_path(letter) + with open(p, "w", encoding="utf-8") as f: + f.write(datetime.datetime.now().isoformat()) + try: + subprocess.check_call(["attrib", "+h", p], creationflags=subprocess.CREATE_NO_WINDOW) + except Exception: + pass + except Exception: + pass + +def has_uploaded_mark(letter: str) -> bool: + return os.path.exists(uploaded_mark_path(letter)) + +def clear_uploaded_mark(letter: str): + try: + p = uploaded_mark_path(letter) + if os.path.exists(p): + os.remove(p) + except Exception: + pass + +# --- новые проверки для повторного втыкания --- + +def newest_file_mtime(letter: str) -> float: + root = f"{letter}\\" + newest = 0.0 + for r, _, files in os.walk(root): + for name in files: + if name in (FLEET_ID_FILENAME, UPLOADED_MARK, "thumbs.db", ".ds_store"): + continue + try: + m = os.path.getmtime(os.path.join(r, name)) + if m > newest: + newest = m + except Exception: + pass + return newest + +def read_uploaded_mark_time(letter: str) -> Optional[float]: + p = uploaded_mark_path(letter) + if not os.path.exists(p): + return None + try: + with open(p, "r", encoding="utf-8") as f: + iso = f.read().strip() + dt = datetime.datetime.fromisoformat(iso) + return dt.timestamp() + except Exception: + return None + +def needs_copy_despite_mark(letter: str) -> bool: + mark_ts = read_uploaded_mark_time(letter) + if mark_ts is None: + return True + newest = newest_file_mtime(letter) + if newest == 0: + return False + return newest > mark_ts + +# =============== Копирование/проверка =============== + +def gather_total_size(src_root: str) -> int: + total = 0 + for root, _, files in os.walk(src_root): + for name in files: + if name in (FLEET_ID_FILENAME, UPLOADED_MARK): + continue + try: + total += os.path.getsize(os.path.join(root, name)) + except Exception: + pass + return total + +def copy_tree_with_progress(src_root: str, dst_root: str, progress_cb=None) -> Tuple[int, int, int]: + ensure_dir(dst_root) + total_bytes = gather_total_size(src_root) + done_bytes = 0 + copied = 0 + skipped = 0 + + for root, _, files in os.walk(src_root): + rel = os.path.relpath(root, src_root) + target = os.path.join(dst_root, rel) if rel != "." else dst_root + ensure_dir(target) + for name in files: + if name.lower() in ("thumbs.db", ".ds_store") or name in (FLEET_ID_FILENAME, UPLOADED_MARK): + continue + s = os.path.join(root, name) + d = os.path.join(target, name) + try: + ssize = os.path.getsize(s) + if os.path.exists(d) and os.path.getsize(d) == ssize: + skipped += 1 + done_bytes += ssize + if progress_cb: + progress_cb(done_bytes, total_bytes) + continue + shutil.copy2(s, d) + copied += 1 + done_bytes += ssize + if progress_cb: + progress_cb(done_bytes, total_bytes) + except Exception: + continue + + return copied, skipped, total_bytes + +def verify_copy_integrity(src_root: str, dst_root: str) -> bool: + for root, _, files in os.walk(src_root): + rel = os.path.relpath(root, src_root) + target = os.path.join(dst_root, rel) if rel != "." else dst_root + for name in files: + if name in (FLEET_ID_FILENAME, UPLOADED_MARK): + continue + s = os.path.join(root, name) + d = os.path.join(target, name) + try: + if not os.path.exists(d): + return False + if os.path.getsize(s) != os.path.getsize(d): + return False + except Exception: + return False + return True + +# ========================== GUI: одиночные окна ========================== + +class OneInstanceMixin: + _instance_windows: Dict[str, tk.Toplevel] = {} + + def _open_single(self, key: str, factory): + win = self._instance_windows.get(key) + if win and win.winfo_exists(): + win.lift(); win.focus_force(); return win + win = factory() + self._instance_windows[key] = win + def on_close(): + try: + if win.winfo_exists(): + win.destroy() + finally: + self._instance_windows.pop(key, None) + win.protocol("WM_DELETE_WINDOW", on_close) + return win + +class GenerateCardDialog(tk.Toplevel): + def __init__(self, master, cars: List[Car], drives: List[str], labels: Dict[str, str]): + super().__init__(master) + self.title("Сгенерировать карту"); self.resizable(False, False) + self.result: Optional[Tuple[Car, str, bool]] = None + + frm = ttk.Frame(self, padding=10); frm.pack(fill=tk.BOTH, expand=True) + + ttk.Label(frm, text="Авто:").grid(row=0, column=0, sticky="w") + self.car_var = tk.StringVar() + car_names = [c.plate for c in cars] + self.car_box = ttk.Combobox(frm, values=car_names, textvariable=self.car_var, state="readonly", width=40) + self.car_box.grid(row=0, column=1, sticky="we", pady=4) + if car_names: self.car_box.current(0) + + ttk.Label(frm, text="Диск:").grid(row=1, column=0, sticky="w") + self.drive_var = tk.StringVar() + display = [] + self.map_disp_to_letter: Dict[str, str] = {} + for d in drives: + lab = labels.get(d) or "БЕЗ НОМЕРА" + disp = f"{d} — {lab}" + display.append(disp) + self.map_disp_to_letter[disp] = d + self.drive_box = ttk.Combobox(frm, values=display, textvariable=self.drive_var, state="readonly", width=40) + self.drive_box.grid(row=1, column=1, sticky="we", pady=4) + if display: self.drive_box.current(0) + + self.set_label_var = tk.BooleanVar(value=True) + ttk.Checkbutton(frm, text="Установить метку тома (цифры из номера)", variable=self.set_label_var)\ + .grid(row=2, column=0, columnspan=2, sticky="w", pady=6) + + btns = ttk.Frame(frm); btns.grid(row=3, column=0, columnspan=2, pady=8) + ttk.Button(btns, text="Создать", command=lambda: self._done(cars)).pack(side=tk.LEFT, padx=6) + ttk.Button(btns, text="Отмена", command=self.destroy).pack(side=tk.LEFT) + + def _done(self, cars: List[Car]): + car_plate = self.car_var.get(); drive_disp = self.drive_var.get() + if not car_plate or not drive_disp: + messagebox.showerror("Ошибка", "Выберите авто и диск"); return + car = next((c for c in cars if c.plate == car_plate), None) + drive = self.map_disp_to_letter.get(drive_disp) + if not car or not drive: + messagebox.showerror("Ошибка", "Некорректный выбор"); return + self.result = (car, drive, bool(self.set_label_var.get())) + self.destroy() + +class FormatDialog(tk.Toplevel): + def __init__(self, master, drives: List[str]): + super().__init__(master) + self.title("Форматирование SD"); self.resizable(False, False) + self.result: Optional[Tuple[str, str]] = None + + labels = {d: (get_volume_label(d) or "БЕЗ НОМЕРА") for d in drives} + frm = ttk.Frame(self, padding=10); frm.pack(fill=tk.BOTH, expand=True) + + ttk.Label(frm, text="Диск:").grid(row=0, column=0, sticky="w") + self.drive_var = tk.StringVar() + items = [f"{d} — {labels[d]}" for d in drives] + self.map_text_to_drive = {f"{d} — {labels[d]}": d for d in drives} + box = ttk.Combobox(frm, values=items, textvariable=self.drive_var, state="readonly", width=40) + box.grid(row=0, column=1, sticky="we", pady=4) + if items: box.current(0) + + ttk.Label(frm, text="Файловая система:").grid(row=1, column=0, sticky="w") + self.fs_var = tk.StringVar(value="FAT32") + fs_box = ttk.Combobox(frm, values=["FAT32", "exFAT", "NTFS"], textvariable=self.fs_var, state="readonly", width=10) + fs_box.grid(row=1, column=1, sticky="w", pady=4) + + btns = ttk.Frame(frm); btns.grid(row=2, column=0, columnspan=2, pady=8) + ttk.Button(btns, text="Форматировать", command=self._ok).pack(side=tk.LEFT, padx=6) + ttk.Button(btns, text="Отмена", command=self.destroy).pack(side=tk.LEFT) + + def _ok(self): + disp = self.drive_var.get(); fs = self.fs_var.get() + if not disp or not fs: + messagebox.showerror("Ошибка", "Выберите диск и ФС"); return + self.result = (self.map_text_to_drive[disp], fs) + self.destroy() + +class CarDialog(tk.Toplevel): + def __init__(self, master, title: str, init: Optional[Car] = None): + super().__init__(master) + self.title(title); self.resizable(False, False) + self.result: Optional[Car] = None + + frm = ttk.Frame(self, padding=10); frm.pack(fill=tk.BOTH, expand=True) + ttk.Label(frm, text="Номер авто:").grid(row=0, column=0, sticky="w") + self.plate_var = tk.StringVar(value=init.plate if init else "") + ttk.Entry(frm, textvariable=self.plate_var, width=40).grid(row=0, column=1, sticky="we", pady=4) + + ttk.Label(frm, text="Путь (UNC/локальный):").grid(row=1, column=0, sticky="w") + self.path_var = tk.StringVar(value=init.custom_path if init else "") + row = ttk.Frame(frm); row.grid(row=1, column=1, sticky="we") + ttk.Entry(row, textvariable=self.path_var, width=40).pack(side=tk.LEFT) + ttk.Button(row, text="...", command=self.pick_path).pack(side=tk.LEFT, padx=4) + + btns = ttk.Frame(frm); btns.grid(row=2, column=0, columnspan=2, pady=8) + ttk.Button(btns, text="Сохранить", command=self._ok).pack(side=tk.LEFT, padx=6) + ttk.Button(btns, text="Отмена", command=self.destroy).pack(side=tk.LEFT) + + self.bind("", lambda e: self._ok()) + self.bind("", lambda e: self.destroy()) + + def pick_path(self): + p = filedialog.askdirectory(title="Выбор папки") + if p: self.path_var.set(p) + + def _ok(self): + plate = (self.plate_var.get() or "").strip() + path = (self.path_var.get() or "").strip() or None + if not plate: + messagebox.showerror("Ошибка", "Номер авто обязателен"); return + self.result = Car(plate=plate, custom_path=path) + self.destroy() + +# ========================== Основное окно ========================== + +class App(tk.Tk, OneInstanceMixin): + def __init__(self): + super().__init__() + self.title(APP_TITLE) + self.geometry("1240x820") + self.minsize(1140, 760) + + self.settings: Settings = load_settings() + + self.pending: Dict[str, Tuple[str, float]] = {} # drive -> (plate, deadline) + self.in_progress: Dict[str, str] = {} # drive -> plate + self.progress_bars: Dict[str, ttk.Progressbar] = {} + self.progress_labels: Dict[str, tk.StringVar] = {} # текст внутри бара + self.car_status: Dict[str, tk.StringVar] = {} + self.q: queue.Queue = queue.Queue() + + self.create_widgets() + self.refresh_car_list() + + style = ttk.Style() + try: + style.theme_use("clam") + except Exception: + pass + style.configure("green.Horizontal.TProgressbar", background="green") + style.configure("yellow.Horizontal.TProgressbar", background="#DDBB00") + style.configure("red.Horizontal.TProgressbar", background="red") + + if self.settings.continuous_scan: + self.start_continuous_scan() + + self.after(100, self.process_queue) + self.after(500, self.tick_pending_countdowns) + + # ---------- UI ---------- + def create_widgets(self): + top = ttk.Frame(self, padding=8); top.pack(side=tk.TOP, fill=tk.X) + + self.cont_scan_var = tk.BooleanVar(value=self.settings.continuous_scan) + self.auto_fmt_var = tk.BooleanVar(value=self.settings.auto_format_after_copy) + + btn1 = ttk.Checkbutton(top, text="Непрерывный поиск SD карт (1)", variable=self.cont_scan_var, command=self.toggle_continuous_scan) + btn1.pack(side=tk.LEFT, padx=(0, 6)) + ttk.Label(top, text="→").pack(side=tk.LEFT) + self.btn2 = ttk.Checkbutton(top, text="Форматирование после авто-загрузки (2)", variable=self.auto_fmt_var, command=self.toggle_auto_format) + self.btn2.pack(side=tk.LEFT, padx=6) + if not self.settings.continuous_scan: + self.btn2.state(["disabled"]) # блок (2), если (1) выкл + + ttk.Button(top, text="Сканировать сейчас", command=self.scan_and_autocopy).pack(side=tk.LEFT, padx=12) + ttk.Button(top, text="Сгенерировать карту", command=self.generate_card).pack(side=tk.LEFT, padx=6) + ttk.Button(top, text="Форматировать SD", command=self.format_sd).pack(side=tk.LEFT, padx=6) + ttk.Button(top, text="Настройки…", command=self.open_settings).pack(side=tk.RIGHT) + + main = ttk.Frame(self); main.pack(fill=tk.BOTH, expand=True, padx=8, pady=8) + left = ttk.Frame(main); left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + right = ttk.Frame(main); right.pack(side=tk.RIGHT, fill=tk.BOTH) + + cars_frame = ttk.LabelFrame(left, text="Автомобили") + cars_frame.pack(fill=tk.BOTH, expand=True) + self.cars_tree = ttk.Treeview(cars_frame, columns=("#", "plate", "path", "state"), show="headings", height=16) + self.cars_tree.heading("#", text="#") + self.cars_tree.heading("plate", text="Номер") + self.cars_tree.heading("path", text="Путь выгрузки") + self.cars_tree.heading("state", text="Статус") + self.cars_tree.column("#", width=40, anchor="center") + self.cars_tree.column("plate", width=220, anchor="w") + self.cars_tree.column("path", width=520, anchor="w") + self.cars_tree.column("state", width=240, anchor="w") + self.cars_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + ttk.Scrollbar(cars_frame, orient="vertical", command=self.cars_tree.yview).pack(side=tk.RIGHT, fill=tk.Y) + + btns = ttk.Frame(left); btns.pack(fill=tk.X, pady=6) + ttk.Button(btns, text="Добавить авто", command=self.add_car).pack(side=tk.LEFT, padx=3) + ttk.Button(btns, text="Редактировать авто", command=self.edit_car).pack(side=tk.LEFT, padx=3) + ttk.Button(btns, text="Удалить авто", command=self.delete_car).pack(side=tk.LEFT, padx=3) + ttk.Button(btns, text="Назначить путь…", command=self.set_car_path).pack(side=tk.LEFT, padx=3) + + # панель прогресса по авто с оверлей-текстом + prog_frame = ttk.LabelFrame(right, text="Загрузки (по автомобилям)") + prog_frame.pack(fill=tk.BOTH, expand=True) + self.progress_container = ttk.Frame(prog_frame); self.progress_container.pack(fill=tk.BOTH, expand=True) + + # Нижний общий бар с оверлеем + bottom_wrap = tk.Frame(self) + bottom_wrap.pack(fill=tk.X, padx=8, pady=(0, 6)) + self.bottom_bar = ttk.Progressbar(bottom_wrap, maximum=100, mode="determinate") + self.bottom_bar.pack(fill=tk.X, expand=True) + self._bottom_overlay = tk.Label(bottom_wrap, text="", anchor="center") + self._bottom_overlay.place(relx=0.5, rely=0.5, anchor="center") + def _resize_bottom_overlay(event): + self._bottom_overlay.configure(wraplength=event.width-8) + self.bottom_bar.bind("", _resize_bottom_overlay) + self.status = tk.StringVar(value="Готово") + def _sync_bottom(varname, idx2, op): + self._bottom_overlay.configure(text=self.status.get()) + self.status.trace_add("write", _sync_bottom) + self._bottom_overlay.configure(text=self.status.get()) + + def refresh_car_list(self): + for i in self.cars_tree.get_children(): + self.cars_tree.delete(i) + for w in self.progress_container.winfo_children(): + w.destroy() + self.progress_bars.clear(); self.progress_labels.clear(); self.car_status.clear() + + for idx, car in enumerate(self.settings.cars, start=1): + st = tk.StringVar(value="—") + self.car_status[car.plate] = st + self.cars_tree.insert("", tk.END, iid=car.plate, values=(idx, car.plate, car.custom_path or "—", st.get())) + + row = ttk.Frame(self.progress_container) + row.pack(fill=tk.X, pady=4) + ttk.Label(row, text=f"{idx}. {car.plate}", width=26).pack(side=tk.LEFT) + + bar_wrap = tk.Frame(row) + bar_wrap.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=8) + bar = ttk.Progressbar(bar_wrap, mode="determinate", maximum=100, length=520, style="green.Horizontal.TProgressbar") + bar.pack(fill=tk.X, expand=True) + bar["value"] = 0 + self.progress_bars[car.plate] = bar + + overlay = tk.Label(bar_wrap, text="—", anchor="center") + overlay.place(relx=0.5, rely=0.5, anchor="center") + def _resize_overlay(event, lbl=overlay): + lbl.configure(wraplength=event.width-8) + bar.bind("", _resize_overlay) + + sv = tk.StringVar(value="—") + def _sync_label(varname, idx2, op, lbl=overlay, var=sv): + lbl.configure(text=var.get()) + sv.trace_add("write", _sync_label) + self.progress_labels[car.plate] = sv + + # ---------- Тумблеры ---------- + def toggle_continuous_scan(self): + self.settings.continuous_scan = bool(self.cont_scan_var.get()) + save_settings(self.settings) + if self.settings.continuous_scan: + self.btn2.state(["!disabled"]) # разблок (2) + self.start_continuous_scan() + else: + self.btn2.state(["disabled"]) # блок (2) + + def toggle_auto_format(self): + if not self.settings.continuous_scan: + self.auto_fmt_var.set(False); return + self.settings.auto_format_after_copy = bool(self.auto_fmt_var.get()) + save_settings(self.settings) + + def start_continuous_scan(self): + def loop(): + if self.settings.continuous_scan: + try: + self.scan_and_autocopy() + except Exception: + pass + self.after(3000, loop) + self.after(0, loop) + + # ---------- Автоскан ---------- + def scan_and_autocopy(self): + drives = list_removable_drives() + # чистим pending для пропавших + for d in list(self.pending.keys()): + if d not in drives: + self.pending.pop(d, None) + for d in drives: + # фильтр по uploaded mark + if has_uploaded_mark(d): + if not needs_copy_despite_mark(d): + continue + else: + clear_uploaded_mark(d) + if d in self.in_progress or d in self.pending: + continue + car = self.identify_car_by_drive(d) + if not car: + continue + if not car.custom_path: + self._set_car_status(car.plate, "Путь не задан") + continue + # жёлтый бар + 20с + deadline = time.time() + 20 + self.pending[d] = (car.plate, deadline) + bar = self.progress_bars.get(car.plate) + if bar: + bar.configure(style="yellow.Horizontal.TProgressbar"); bar["value"] = 0 + lbl = self.progress_labels.get(car.plate) + if lbl: lbl.set("Обнаружена карта. Автозагрузка через 20 сек…") + self._set_car_status(car.plate, "Ожидание автозагрузки") + + def tick_pending_countdowns(self): + now = time.time() + for d, (plate, deadline) in list(self.pending.items()): + car = next((c for c in self.settings.cars if c.plate == plate), None) + if not car: + self.pending.pop(d, None); continue + remain = int(max(0, deadline - now)) + lbl = self.progress_labels.get(plate) + if lbl: + lbl.set(f"Обнаружена карта. Автозагрузка через {remain} сек…") + if remain <= 0: + self.pending.pop(d, None) + self.start_copy_thread(d, car) + self.after(500, self.tick_pending_countdowns) + + def identify_car_by_drive(self, letter: str) -> Optional[Car]: + fid = read_fleet_id(letter) + if fid and "plate" in fid: + car = next((c for c in self.settings.cars if c.plate == fid["plate"]), None) + if car: return car + lab = (get_volume_label(letter) or "").strip() + if lab: + for c in self.settings.cars: + if only_digits(c.plate) == lab and lab != "": + return c + return None + + # ---------- Копирование ---------- + def start_copy_thread(self, drive_letter: str, car: Car): + if drive_letter in self.in_progress: + return + self.in_progress[drive_letter] = car.plate + + src_root = f"{drive_letter}\\" + base = car.custom_path or "" + if not base: + self._set_car_status(car.plate, "Путь не задан") + self.in_progress.pop(drive_letter, None) + return + dst_root = base + if self.settings.use_date_folder: + date_folder = datetime.datetime.now().strftime("%m-%d-%Y") + dst_root = os.path.join(dst_root, date_folder) + ensure_dir(dst_root) + + bar = self.progress_bars.get(car.plate) + if bar: + bar.configure(style="green.Horizontal.TProgressbar"); bar["value"] = 0 + lbl = self.progress_labels.get(car.plate) + if lbl: lbl.set("Подготовка к копированию…") + self._set_car_status(car.plate, "Копирование началось") + + def worker(): + ok = True + err_msg = "" + t0 = time.time() + + def progress_cb(done, total): + perc = int((done / total) * 100) if total > 0 else 100 + elapsed = max(0.001, time.time() - t0) + speed = done / elapsed + remain = (total - done) / speed if speed > 0 else 0 + self.q.put(("progress", car.plate, perc, done, total, remain)) + + try: + copied, skipped, total = copy_tree_with_progress(src_root, dst_root, progress_cb) + if total == 0: + ok = True + else: + if not verify_copy_integrity(src_root, dst_root): + ok = False; err_msg = "Проверка копирования не пройдена." + append_log({ + "timestamp": datetime.datetime.now().isoformat(), + "action": "copy", + "drive": drive_letter, + "car_plate": car.plate, + "dst": dst_root, + "copied": copied, + "skipped": skipped, + "total_bytes": total, + "total_h": human_size(total), + "verify_ok": ok, + }) + except Exception as e: + ok = False; err_msg = str(e) + + did_format = False + if ok and self.settings.continuous_scan and self.settings.auto_format_after_copy: + try: + fs = self.settings.auto_format_fs or "FAT32" + label = only_digits(car.plate) or "0" + if format_volume(drive_letter, fs, label): + write_fleet_id(drive_letter, car) + did_format = True + set_uploaded_mark(drive_letter) + except Exception: + pass + elif ok: + set_uploaded_mark(drive_letter) + + self.q.put(("done", car.plate, ok, err_msg, drive_letter, did_format, total)) + + threading.Thread(target=worker, daemon=True).start() + + def process_queue(self): + while True: + try: + msg = self.q.get_nowait() + except queue.Empty: + break + kind = msg[0] + if kind == "progress": + plate, perc, done, total, remain = msg[1], msg[2], msg[3], msg[4], msg[5] + bar = self.progress_bars.get(plate) + if bar: + bar["value"] = max(0, min(100, int(perc))) + lbl = self.progress_labels.get(plate) + if lbl: + eta = datetime.timedelta(seconds=int(remain)) + lbl.set(f"{perc}% • {human_size(done)}/{human_size(total)} • ETA {eta}") + self.bottom_bar["value"] = max(0, min(100, int(perc))) + self.status.set(f"{plate}: {perc}%") + elif kind == "done": + plate, ok, err, drive_letter, did_format, total = msg[1], msg[2], msg[3], msg[4], msg[5], msg[6] + bar = self.progress_bars.get(plate) + lbl = self.progress_labels.get(plate) + if bar: + bar["value"] = 100 + if ok: + bar.configure(style="green.Horizontal.TProgressbar") + if did_format: + if lbl: lbl.set("ВЫПОЛНЕНО!") + else: + if total == 0: + if lbl: lbl.set("Пустая SD карта") + else: + if lbl: lbl.set("Завершено успешно") + else: + bar.configure(style="red.Horizontal.TProgressbar") + if lbl: lbl.set("Ошибка. Требуется вмешательство администратора") + if ok: + if total == 0 and not did_format: + self._set_car_status(plate, "Пустая SD карта") + else: + self._set_car_status(plate, "Копирование успешно, извлеките устройство") + self.status.set(f"{plate}: Готово") + else: + self._set_car_status(plate, "Ошибка. Требуется вмешательство системного администратора") + self.status.set(f"{plate}: Ошибка") + self.in_progress.pop(drive_letter, None) + self.after(100, self.process_queue) + + # ---------- Управление авто ---------- + def add_car(self): + dlg = CarDialog(self, "Добавить автомобиль") + self.wait_window(dlg) + if dlg.result: + self.settings.cars.append(dlg.result) + save_settings(self.settings) + self.refresh_car_list() + + def edit_car(self): + sel = self.cars_tree.selection() + if not sel: + messagebox.showwarning("Редактировать", "Выберите авто."); return + plate = sel[0] + car = next((c for c in self.settings.cars if c.plate == plate), None) + if not car: return + dlg = CarDialog(self, "Редактировать автомобиль", init=car) + self.wait_window(dlg) + if dlg.result: + car.plate = dlg.result.plate + car.custom_path = dlg.result.custom_path + save_settings(self.settings) + self.refresh_car_list() + + def delete_car(self): + sel = self.cars_tree.selection() + if not sel: return + plate = sel[0] + car = next((c for c in self.settings.cars if c.plate == plate), None) + if not car: return + if not messagebox.askyesno("Подтверждение", f"Удалить авто {car.plate}?"): + return + self.settings.cars = [c for c in self.settings.cars if c.plate != plate] + save_settings(self.settings) + self.refresh_car_list() + + def set_car_path(self): + sel = self.cars_tree.selection() + if not sel: return + plate = sel[0] + car = next((c for c in self.settings.cars if c.plate == plate), None) + if not car: return + p = filedialog.askdirectory(title=f"Выберите папку выгрузки для {car.plate}") + if p: + car.custom_path = p + save_settings(self.settings) + self.refresh_car_list() + + def _set_car_status(self, plate: str, text: str): + var = self.car_status.get(plate) + if var: + var.set(text) + vals = list(self.cars_tree.item(plate, "values")) + if len(vals) == 4: + vals[3] = text + self.cars_tree.item(plate, values=vals) + + # ---------- Генерация карты ---------- + def generate_card(self): + if not self.settings.cars: + messagebox.showwarning("Нет машин", "Сначала добавьте хотя бы одну машину."); return + drives = list_removable_drives() + if not drives: + messagebox.showwarning("Нет карт", "Съёмные носители не найдены."); return + labels = {d: (get_volume_label(d) or "БЕЗ НОМЕРА") for d in drives} + + def factory(): + return GenerateCardDialog(self, self.settings.cars, drives, labels) + dlg: GenerateCardDialog = self._open_single("generate", factory) # type: ignore + self.wait_window(dlg) + if not getattr(dlg, 'result', None): + return + car, drive, setlab = dlg.result + clear_uploaded_mark(drive) + if not write_fleet_id(drive, car): + return + if setlab: + digits = only_digits(car.plate) or "0" + set_volume_label(drive, digits) + messagebox.showinfo("Готово", f"Карта для {car.plate} подготовлена на {drive}.") + + # ---------- Форматирование ---------- + def format_sd(self): + drives = list_removable_drives() + if not drives: + messagebox.showwarning("Нет карт", "Съёмные носители не найдены."); return + def factory(): + return FormatDialog(self, drives) + dlg: FormatDialog = self._open_single("format", factory) # type: ignore + self.wait_window(dlg) + if not getattr(dlg, 'result', None): + return + drive, fs = dlg.result + car = self.identify_car_by_drive(drive) + digits = only_digits(car.plate) if car else "MHT" + if not messagebox.askyesno("Подтверждение", f"Форматировать {drive} как {fs}? ВСЕ данные будут удалены!"): + return + if format_volume(drive, fs, digits or "MHT"): + clear_uploaded_mark(drive) + if car: + write_fleet_id(drive, car) + messagebox.showinfo("Готово", f"{drive} отформатирован.") + else: + messagebox.showerror("Ошибка", "Форматирование не удалось.") + + # ---------- Настройки ---------- + def open_settings(self): + def factory(): + top = tk.Toplevel(self) + top.title("Настройки"); top.resizable(False, False) + frm = ttk.Frame(top, padding=10); frm.pack(fill=tk.BOTH, expand=True) + + self.chk_date_var = tk.BooleanVar(value=self.settings.use_date_folder) + ttk.Checkbutton(frm, text="Актуальная дата загрузки", variable=self.chk_date_var)\ + .grid(row=0, column=0, sticky="w", pady=4) + + ttk.Label(frm, text="Файловая система для авто-формата:").grid(row=1, column=0, sticky="w", pady=(8, 2)) + self.fs_var = tk.StringVar(value=self.settings.auto_format_fs) + fs_box = ttk.Combobox(frm, values=["FAT32", "exFAT", "NTFS"], textvariable=self.fs_var, state="readonly", width=10) + fs_box.grid(row=1, column=1, sticky="w", pady=(8, 2)) + + btns = ttk.Frame(frm); btns.grid(row=2, column=0, columnspan=2, pady=10) + ttk.Button(btns, text="Сохранить", command=lambda: self._save_settings(top)).pack(side=tk.LEFT, padx=6) + ttk.Button(btns, text="Закрыть", command=top.destroy).pack(side=tk.LEFT) + return top + self._open_single("settings", factory) + + def _save_settings(self, win: tk.Toplevel): + self.settings.use_date_folder = bool(self.chk_date_var.get()) + self.settings.auto_format_fs = self.fs_var.get() or "FAT32" + save_settings(self.settings) + win.destroy() + +# ========================== main ========================== + +def main(): + if os.name != "nt": + messagebox.showerror("Только Windows", "Приложение поддерживает только Windows (PowerShell операции).") + return + app = App() + app.mainloop() + +if __name__ == "__main__": + main()