Files
server-toolset/many-rsync/main.py
Yigid BALABAN a168b4cbea add project tooling and test suite for many-rsync
- Add pyproject.toml: hatchling build, many-rsync entrypoint, ruff/mypy/pytest config
with 60% coverage floor
- Add uv.lock for reproducible dev installs
- Add .pre-commit-config.yaml: ruff (with --fix) + mypy hooks
- Add test_main.py: unit tests for _build_rsync_cmd, _load_raw, and load_config
covering happy paths and FATAL exit cases
- Add explanation.md: architecture overview with flowchart
- main.py: refactor into typed, testable functions (_load_raw, _build_rsync_cmd
extracted); add RsyncParameters/Config TypedDicts; add rsync_parameters config support
(rsync_path, exclude_from); harden validation (n clamped, log_level validated)
- README.md: update install instructions and document all config fields including
rsync_parameters
2026-03-31 22:05:08 +03:00

183 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""
many-rsync
author Yigid BALABAN <balaban@yigid.dev>
co-authored by Opus 4.6
parallel rsync runner. reads config from TOML (preferred) or JSON.
local: foo bar --|many-rsync|--> remote/foo remote/bar
"""
from __future__ import annotations
import json
import logging
import subprocess
import sys
import tomllib
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal, TypedDict
class RsyncParameters(TypedDict, total=False):
rsync_path: str
exclude_from: str
class Config(TypedDict):
local_folders: list[Path]
remote_folder: str
n: int
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
rsync_parameters: RsyncParameters
HOME = Path.home()
LOG_DIR = HOME / ".rsync-logs"
RSYNC_BASE = ("rsync", "-avh", "--progress", "--delete", "--stats")
logger = logging.getLogger("sync")
# ── config ────────────────────────────────────────────────────────────────────
def _load_raw(path: Path) -> dict[str, Any]:
text = path.read_text()
if path.suffix == ".toml":
return tomllib.loads(text)
if path.suffix == ".json":
result: dict[str, Any] = json.loads(text)
return result
sys.exit(f"FATAL: unsupported config format: {path.suffix}")
def load_config(path: Path) -> Config:
raw = _load_raw(path)
folders: list[Any] = raw.get("local_folders", [])
if not folders:
sys.exit("FATAL: local_folders must be a non-empty list")
folders = [Path(f).expanduser() for f in folders]
for f in folders:
if not f.is_dir():
sys.exit(
f"FATAL: local_folders entries must exist and be folders, got: {f!r}"
)
remote = raw.get("remote_folder")
if not remote:
sys.exit("FATAL: remote_folder is required")
remote = str(remote) # keep as str — may be "host:/path", not a local path
n = int(raw.get("n", 1))
if n < 1:
sys.exit("FATAL: n must be >= 1")
if n > len(folders):
n = len(folders) # no point spawning idle workers
level = raw.get("log_level", "INFO").upper()
if level not in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"):
sys.exit(f"FATAL: invalid log_level: {level!r}")
rsync_params: RsyncParameters = {}
raw_params = raw.get("rsync_parameters", {})
if rsync_path := raw_params.get("rsync_path"):
rsync_params["rsync_path"] = str(rsync_path)
if exclude_from := raw_params.get("exclude_from"):
rsync_params["exclude_from"] = str(exclude_from)
return {
"local_folders": folders,
"remote_folder": remote,
"n": n,
"log_level": level,
"rsync_parameters": rsync_params,
}
# ── sync ──────────────────────────────────────────────────────────────────────
def _build_rsync_cmd(params: RsyncParameters) -> tuple[str, ...]:
"""Extend RSYNC_BASE with optional flags from config."""
extra: list[str] = []
if rp := params.get("rsync_path"):
extra.append(f"--rsync-path={rp}")
if ef := params.get("exclude_from"):
extra.append(f"--exclude-from={ef}")
return (*RSYNC_BASE, *extra)
def sync_folder(
folder: Path, remote: str, ts: str, params: RsyncParameters
) -> tuple[Path, int]:
"""Run rsync for a single folder. Returns (folder, returncode)."""
log_file = LOG_DIR / f"{folder.name}-{ts}.log"
cmd: list[str | Path] = [*_build_rsync_cmd(params), f"{folder}", remote]
logger.info("START %s%s (log: %s)", folder, remote, log_file)
with log_file.open("w") as fh:
proc = subprocess.run(
cmd,
cwd=HOME,
stdin=subprocess.DEVNULL, # no terminal fights
stdout=fh,
stderr=subprocess.STDOUT, # interleave; nothing goes silent
text=True,
)
if proc.returncode != 0:
logger.error("FAIL %s rc=%d — see %s", folder, proc.returncode, log_file)
else:
logger.info("OK %s", folder)
return folder, proc.returncode
# ── main ──────────────────────────────────────────────────────────────────────
def main() -> None:
cfg_path = Path(sys.argv[1]) if len(sys.argv) > 1 else HOME / "sync.toml"
cfg = load_config(cfg_path)
ts = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=cfg["log_level"],
format="%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S",
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_DIR / f"rsync-{ts}.log"),
],
)
folders, remote, n = cfg["local_folders"], cfg["remote_folder"], cfg["n"]
params = cfg["rsync_parameters"]
logger.debug("got config: %s", cfg)
logger.info("log level is set to %s", cfg["log_level"])
logger.info("syncing %d folder(s), parallelism=%d", len(folders), n)
failed: list[tuple[Path, int]] = []
with ThreadPoolExecutor(max_workers=n) as pool:
futures = {pool.submit(sync_folder, f, remote, ts, params): f for f in folders}
for fut in as_completed(futures):
folder, rc = fut.result()
if rc != 0:
failed.append((folder, rc))
if failed:
for f, rc in failed:
logger.critical("FAILED: %s (rc=%d)", f, rc)
sys.exit(1)
logger.info("SUCC all folders synced successfully")
if __name__ == "__main__":
main()