#!/usr/bin/env python3 """ many-rsync author Yigid BALABAN co-authored by Opus 4.6 parallel rsync runner. reads config from TOML (preferred) or JSON. local: foo bar --|many-rsync|--> remote/foo remote/bar """ import json import logging import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from typing import Any, TypedDict, Literal class RsyncParameters(TypedDict, total=False): rsync_path: str exclude_from: str class Config(TypedDict): local_folders: list[Path] remote_folder: str n: int log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] rsync_parameters: RsyncParameters try: import tomllib # 3.11+ except ModuleNotFoundError: tomllib = None HOME = Path.home() LOG_DIR = HOME / ".rsync-logs" RSYNC_BASE = ("rsync", "-avh", "--progress", "--delete", "--stats") logger = logging.getLogger("sync") # ── config ──────────────────────────────────────────────────────────────────── def _load_raw(path: Path) -> dict[str, Any]: text = path.read_text() if path.suffix == ".toml": if tomllib is None: sys.exit("FATAL: Python < 3.11 has no tomllib; install tomli or use JSON config") return tomllib.loads(text) if path.suffix == ".json": return json.loads(text) sys.exit(f"FATAL: unsupported config format: {path.suffix}") def load_config(path: Path) -> Config: raw = _load_raw(path) folders: list[Any] = raw.get("local_folders", []) if not folders: sys.exit("FATAL: local_folders must be a non-empty list") folders = [Path(f).expanduser() for f in folders] for f in folders: if not f.is_dir(): sys.exit(f"FATAL: local_folders entries must exist and be folders, got: {f!r}") remote = raw.get("remote_folder") if not remote: sys.exit("FATAL: remote_folder is required") remote = str(remote) # keep as str — may be "host:/path", not a local path n = int(raw.get("n", 1)) if n < 1: sys.exit("FATAL: n must be >= 1") if n > len(folders): n = len(folders) # no point spawning idle workers level = raw.get("log_level", "INFO").upper() if level not in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"): sys.exit(f"FATAL: invalid log_level: {level!r}") rsync_params: RsyncParameters = {} raw_params = raw.get("rsync_parameters", {}) if rsync_path := raw_params.get("rsync_path"): rsync_params["rsync_path"] = str(rsync_path) if exclude_from := raw_params.get("exclude_from"): rsync_params["exclude_from"] = str(exclude_from) return { "local_folders": folders, "remote_folder": remote, "n": n, "log_level": level, "rsync_parameters": rsync_params, } # ── sync ────────────────────────────────────────────────────────────────────── def _build_rsync_cmd(params: RsyncParameters) -> tuple[str, ...]: """Extend RSYNC_BASE with optional flags from config.""" extra: list[str] = [] if rp := params.get("rsync_path"): extra.append(f"--rsync-path={rp}") if ef := params.get("exclude_from"): extra.append(f"--exclude-from={ef}") return (*RSYNC_BASE, *extra) def sync_folder(folder: Path, remote: str, ts: str, params: RsyncParameters) -> tuple[Path, int]: """Run rsync for a single folder. Returns (folder, returncode).""" log_file = LOG_DIR / f"{folder.name}-{ts}.log" cmd: list[str | Path] = [*_build_rsync_cmd(params), f"{folder}", remote] logger.info("START %s → %s (log: %s)", folder, remote, log_file) with log_file.open("w") as fh: proc = subprocess.run( cmd, cwd=HOME, stdin=subprocess.DEVNULL, # prevent parallel processes from fighting over terminal input stdout=fh, stderr=subprocess.STDOUT, # interleave; nothing goes silent text=True, ) if proc.returncode != 0: logger.error("FAIL %s rc=%d — see %s", folder, proc.returncode, log_file) else: logger.info("OK %s", folder) return folder, proc.returncode # ── main ────────────────────────────────────────────────────────────────────── def main() -> None: cfg_path = Path(sys.argv[1]) if len(sys.argv) > 1 else HOME / "sync.toml" cfg = load_config(cfg_path) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=cfg["log_level"], format="%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S", handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_DIR / f"rsync-{ts}.log"), ] ) folders, remote, n = cfg["local_folders"], cfg["remote_folder"], cfg["n"] params = cfg["rsync_parameters"] logger.debug("got config: %s", cfg) logger.info("log level is set to %s", cfg["log_level"]) logger.info("syncing %d folder(s), parallelism=%d", len(folders), n) failed: list[tuple[Path, int]] = [] with ThreadPoolExecutor(max_workers=n) as pool: futures = {pool.submit(sync_folder, f, remote, ts, params): f for f in folders} for fut in as_completed(futures): folder, rc = fut.result() if rc != 0: failed.append((folder, rc)) if failed: for f, rc in failed: logger.critical("FAILED: %s (rc=%d)", f, rc) sys.exit(1) logger.info("SUCC all folders synced successfully") if __name__ == "__main__": main()