diff options
author | amelia squires <[email protected]> | 2024-08-24 01:26:40 -0500 |
---|---|---|
committer | amelia squires <[email protected]> | 2024-08-24 01:26:40 -0500 |
commit | 73cbed952c5d2cdfdaceb5f8c2b19c77738b5186 (patch) | |
tree | 609a05141cfa950fd8637490de1dfe8241d65d3d /src |
init
Diffstat (limited to 'src')
-rwxr-xr-x | src/__main__.py | 142 | ||||
-rw-r--r-- | src/add.py | 47 | ||||
-rw-r--r-- | src/common.py | 61 | ||||
-rw-r--r-- | src/copy.py | 24 | ||||
-rw-r--r-- | src/rm.py | 37 | ||||
-rw-r--r-- | src/sync.py | 35 |
6 files changed, 346 insertions, 0 deletions
diff --git a/src/__main__.py b/src/__main__.py new file mode 100755 index 0000000..d38d550 --- /dev/null +++ b/src/__main__.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +import copy +from common import * +from sync import * +from add import * +from rm import * + +if len(sys.argv) == 1: + print("gentoo portage overlay steal (gpo-steal), written by amelia (https://github.com/ameliasquires)") + print("\tlicensed under 3-clause BSD, see license for more info") + print("") + print("utility for copying packages from a overlay to yours") + print("") + print("usage:") + print(" sync - selects repository to update from source, also updates tracked packages") + print(" add [repository] - adds a overlay, accepts git links, github account/name (ex: gentoo/gentoo),") + print("\tand between everything listed on https://overlays.gentoo.org/") + print(" delete - remove a repository") + print(" copy [package] - searches all added repositories for package, accepts package-name,") + print("\tpackage-location/package-name, do not use : to specify repository") + print(" rm [package] - removes or untracks a package, accepts the same params as copy, but allows : to specify repository") + print(" ls - lists all tracked packages (not untracked, yet installed ones)") + print("") + exit(0) + +match sys.argv[1]: + case "sync" | "s": + options = list(repos.keys()) + name = questionary.select("which repository", options).ask() + sync_repos = [repos[name]] + + sync(sync_repos) + + case "add" | "a": + assert len(sys.argv) >= 3 + + obj = get_repo(sys.argv[2]) + + repos[obj["name"]] = obj + + save() + + if questionary.confirm("sync repository?").ask(): + sync([obj]) + + case "delete" | "del" | "d": + options = list(repos.keys()) + name = questionary.select("which repository", options).ask() + del_repos = [repos[name]] + + oper = questionary.select("delete", ["repository", "entry & repository"]).ask() + + if not questionary.confirm("are you sure?").ask(): + print("exiting") + exit(0) + + for r in del_repos: + full_path = os.path.join(REPO_LOC, r["name"]) + print("del "+ full_path) + + if os.path.isdir(full_path): + shutil.rmtree(full_path) + if oper == "entry & repository": + del repos[name] + + save() + + case "copy" | "c": + assert len(sys.argv) >= 3 + + search = sys.argv[2] + full = '/' in search + + options = [] + + for r in repos: + path = os.path.join(REPO_LOC, r) + + if full: + full_path = os.path.join(path, search) + if os.path.isdir(full_path): + options.append({"path": search, "repo": r, "full_name": f"{search}:{r}"}) + print(options) + + else: + if not os.path.isdir(path): + continue + repo = os.listdir(path) + for dir in repo: + dir_path = os.path.join(path, dir) + + if os.path.isdir(dir_path): + for f in os.listdir(dir_path): + if f == search: + + full_path = os.path.join(dir_path, f) + name = os.path.join(dir, search) + options.append({"path": name, "full_path": full_path, "repo": r, "full_name": f"{name}:{r}" }) + + use = None + if len(options) > 1: + choices = [i["full_name"] for i in options] + choice = questionary.select("select an option", choices).ask() + use = [x for x in options if x["full_name"] == choice][0] + elif len(options) == 1: + use = options[0] + else: + print("no matches") + exit(0) + + copy.copy([use]) + + case "r" | "rm": + assert len(sys.argv) >= 3 + + search = sys.argv[2] + has_repo = ":" in search + + if has_repo: + [search, repo] = search.split(":") + + if not repo in tracked: + print("repository has no tracked packages") + exit(1) + + rm(search, [repo]) + else: + rm(search, tracked.keys()) + + case "ls" | "l" | "list": + + if len(sys.argv) >= 3: + if not sys.argv[2] in tracked: + print("nothing in repo") + exit(1) + + for i in tracked[sys.argv[2]]: + print(i) + else: + for r in tracked.keys(): + for p in tracked[r]: + print(f"{p}:{r}") diff --git a/src/add.py b/src/add.py new file mode 100644 index 0000000..00485ce --- /dev/null +++ b/src/add.py @@ -0,0 +1,47 @@ +from common import * + +def get_name(url: str) -> str: + name = url.split("/")[-1] + + if str(name).endswith(".git"): + name = name[0:-4] + return name + + +def get_repo(inp: str) -> repo_pair: + if str(inp).startswith("http"): + obj: repo_pair = {"url" : inp, "name":""} + name = get_name(inp) + + obj["name"] = name + + return obj + elif '/' in str(inp): + return get_repo(f"https://github.com/{inp}") + else: + data = requests.get("https://qa-reports.gentoo.org/output/repos/repositories.xml") + xml = xmltodict.parse(data.text) + close = [x for x in xml["repositories"]["repo"] if x["name"] == inp or SequenceMatcher(None, inp, x["name"]).ratio() > 0.6] + found = [x for x in close if x["name"] == inp] + + if len(found) > 0: + if len(found) == 1: + valid_sources = [x for x in found[0]["source"] if x["@type"] == "git"] + + return get_repo(valid_sources[0]["#text"]) + else: + print("multiple results found (TODO)") + exit(1) + else: + if len(close) == 0: + print("no exact or close matches") + exit(1) + + names = [x["name"] for x in close] + choice = questionary.select("no direct matches found, similar results", names).ask() + use = [x for x in close if x["name"] == choice] + valid_sources = [x for x in use[0]["source"] if x["@type"] == "git"] + + return get_repo(valid_sources[0]["#text"]) + + diff --git a/src/common.py b/src/common.py new file mode 100644 index 0000000..416d836 --- /dev/null +++ b/src/common.py @@ -0,0 +1,61 @@ +import yaml +import sys +import os +import subprocess +import questionary +import os +import shutil +import requests +import xmltodict +from difflib import SequenceMatcher +from typing import TypedDict + +REAL_REPO = "" +REPOS_LOCATION = "/var/db/repos" +REPO_LOC = "" + +try: + with open(os.path.expanduser("~/.config/gpo-steal/config.yaml"), "r") as f: + cfg = yaml.safe_load(f.read()) + REAL_REPO = cfg["repo"] + REPO_LOC = cfg["repo_location"] +except FileNotFoundError: + print("no configuration found, should be in ~/.config/gpo-steal/config.yaml") + exit(1) + +FULL_REPOS_LOCATION = os.path.join(REPOS_LOCATION, REAL_REPO) +REPO_CFG_LOC = os.path.join(REPO_LOC, "repos.yaml") +TRACKED_CFG_LOC = os.path.join(REPO_LOC, "tracked.yaml") + +os.system(f"mkdir -p {REPO_LOC}") + +class repo_pair(TypedDict): + url: str + name: str + +repos = {} +tracked = {} + +root = "" +if not shutil.which("sudo") is None: + root = "sudo" +if not shutil.which("doas") is None: + root = "doas" +try: + with open(REPO_CFG_LOC, "r") as f: + repos = yaml.safe_load(f.read()) or repos +except: + pass + +try: + with open(TRACKED_CFG_LOC, "r") as f: + tracked = yaml.safe_load(f.read()) or tracked +except: + pass + +def save() -> None: + with open(REPO_CFG_LOC, "w") as f: + yaml.dump(repos, f) + + with open(TRACKED_CFG_LOC, "w") as f: + yaml.dump(tracked, f) diff --git a/src/copy.py b/src/copy.py new file mode 100644 index 0000000..9ffc24c --- /dev/null +++ b/src/copy.py @@ -0,0 +1,24 @@ +from common import * + +def copy(use_d) -> None: + + srcs = [] + dests = [] + for use in use_d: + if not use["repo"] in tracked: + tracked[use["repo"]] = [] + if not use["path"] in tracked[use["repo"]]: + tracked[use["repo"]].append(use["path"]) + save() + + srcs.append(os.path.join(REPO_LOC, use["repo"], use["path"])) + dests.append(os.path.join(FULL_REPOS_LOCATION, use["path"])) + + cmd = f"{root} sh -c 'mkdir -p {' '.join(dests)};" + for i in range(len(dests)): + cmd += f"cp -r {srcs[i]}/* {dests[i]};" + cmd += "'" + print(cmd) + os.system(cmd) + + diff --git a/src/rm.py b/src/rm.py new file mode 100644 index 0000000..9f6956e --- /dev/null +++ b/src/rm.py @@ -0,0 +1,37 @@ +from common import * + +def rm_pkg(pkg) -> None: + cmd = f"{root} rm -fr {os.path.join(FULL_REPOS_LOCATION, pkg)}" + print(cmd) + os.system(cmd) + +def rm(pkg, repos) -> None: + + found = [] + for r in repos: + for t in tracked[r]: + if t.endswith(pkg): + found.append({"path": t, "repo": r}) + + use = None + + if len(found) == 0: + print("not found") + exit(1) + elif len(found) == 1: + use = found[0] + else: + names = [f"{x["path"]}:{x["repo"]}" for x in found] + selected = questionary.select(f"{pkg} is ambiguous, select a more specific package", names).ask().split(":") + use = [x for x in found if x["path"] == selected[0] and x["repo"] == selected[1]][0] + print(use) + + op = questionary.select("remove", ["package", "track", "package & track"]).ask() + + if op == "package" or op == "package & track": + rm_pkg(use["path"]) + if op == "track" or op == "package & track": + tracked[use["repo"]].remove(use["path"]) + save() + + diff --git a/src/sync.py b/src/sync.py new file mode 100644 index 0000000..8a1a94e --- /dev/null +++ b/src/sync.py @@ -0,0 +1,35 @@ +from common import * +import copy + +def sync_obj(repo_name: str) -> None: + if not repo_name in tracked: + return + + for p in tracked[repo_name]: + copy.copy([{"path":p, "repo": repo_name, "full_path": os.path.join(REPO_LOC, repo_name, p)}]) + +def sync(sync_repos: list[repo_pair]) -> None: + for r in sync_repos: + full_path = os.path.join(REPO_LOC, r["name"]) + if os.path.isdir(full_path): + + old_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD") + os.system(f"cd {full_path} && git pull --quiet") + + new_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD") + + if old_hash == new_hash: + print(f"{r["name"]}: up to date") + else: + print(f"{r["name"]}: {new_hash}") + + else: + os.system(f"cd {REPO_LOC} && git clone {r["url"]} {r["name"]}") + new_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD") + + print(f"{r["name"]}: {new_hash}") + + if questionary.confirm("would you like to sync all tracked packages, from this repo?").ask(): + sync_obj(r["name"]) + + |