#!/usr/bin/env python3 """ many-rsync author Yigid BALABAN co-authored by Opus 4.6 parallel rsync runner. reads config from TOML (preferred) or JSON. local: foo bar baz --|many-rsync|--> remote/foo remote/bar remote/baz optionally takes in an argument for configuration """ import json import logging import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from typing import Any try: import tomllib # 3.11+ except ModuleNotFoundError: tomllib = None HOME = Path.home() LOG_DIR = HOME / ".rsync-logs" RSYNC_CMD = ( "rsync", "-avh", "--progress", "--delete", "--stats", '--rsync-path=/bin/rsync', # TODO: this MUST be configurable! default: don't pass this option "--exclude-from=.rsync-exclude.txt", # TODO: this should be configurable. ) logger = logging.getLogger("sync") # ── config ──────────────────────────────────────────────────────────────────── def _load_raw(path: Path) -> dict[str, Any]: text = path.read_text() if path.suffix == ".toml": if tomllib is None: sys.exit("FATAL: Python < 3.11 has no tomllib; install tomli or use JSON config") return tomllib.loads(text) if path.suffix == ".json": return json.loads(text) sys.exit(f"FATAL: unsupported config format: {path.suffix}") def load_config(path: Path) -> dict: raw = _load_raw(path) folders = raw.get("local_folders", []) if not folders or not isinstance(folders, list): sys.exit("FATAL: local_folders must be a non-empty list") folders = [Path(f).expanduser() for f in folders] for f in folders: if not f.is_dir(): sys.exit(f"FATAL: local_folders entries must exist and be folders, got: {f!r}") remote = raw.get("remote_folder") if not remote: sys.exit("FATAL: remote_folder is required") n = int(raw.get("n", 2)) if n < 1: sys.exit("FATAL: n must be >= 1") if n > len(folders): n = len(folders) # no point spawning idle workers level = raw.get("log_level", "INFO").upper() if level not in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"): sys.exit(f"FATAL: invalid log_level: {level!r}") cfg = {"local_folders": folders, "remote_folder": remote, "n": n, "log_level": level} logger.debug("got config:", cfg) return cfg # ── sync ────────────────────────────────────────────────────────────────────── def sync_folder(folder: str, remote: str, ts: str) -> tuple[str, int]: """Run rsync for a single folder. Returns (folder, returncode).""" log_file = LOG_DIR / f"{folder}-{ts}.log" cmd = [*RSYNC_CMD, f"{folder}", remote] logger.info("START %s → %s (log: %s)", folder, remote, log_file) with log_file.open("w") as fh: proc = subprocess.run( cmd, cwd=HOME, stdout=fh, stderr=subprocess.STDOUT, # interleave; nothing goes silent text=True, ) if proc.returncode != 0: logger.error("FAIL %s rc=%d — see %s", folder, proc.returncode, log_file) else: logger.info("OK %s", folder) return folder, proc.returncode # ── main ────────────────────────────────────────────────────────────────────── def main() -> None: cfg_path = Path(sys.argv[1]) if len(sys.argv) > 1 else HOME / "sync.toml" cfg = load_config(cfg_path) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=cfg["log_level"], format="%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S", handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_DIR / f"rsync-{ts}.log"), ] ) folders, remote, n = cfg["local_folders"], cfg["remote_folder"], cfg["n"] logger.info("log level is INFO by default, use config to change") logger.info("syncing %d folder(s), parallelism=%d", len(folders), n) failed: list[tuple[str, int]] = [] with ThreadPoolExecutor(max_workers=n) as pool: futures = {pool.submit(sync_folder, f, remote, ts): f for f in folders} for fut in as_completed(futures): folder, rc = fut.result() if rc != 0: failed.append((folder, rc)) if failed: for f, rc in failed: logger.critical("FAILED: %s (rc=%d)", f, rc) sys.exit(1) logger.info("all folders synced successfully") if __name__ == "__main__": main()