aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/__main__.py142
-rw-r--r--src/add.py47
-rw-r--r--src/common.py61
-rw-r--r--src/copy.py24
-rw-r--r--src/rm.py37
-rw-r--r--src/sync.py35
6 files changed, 346 insertions, 0 deletions
diff --git a/src/__main__.py b/src/__main__.py
new file mode 100755
index 0000000..d38d550
--- /dev/null
+++ b/src/__main__.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python3
+import copy
+from common import *
+from sync import *
+from add import *
+from rm import *
+
+if len(sys.argv) == 1:
+ print("gentoo portage overlay steal (gpo-steal), written by amelia (https://github.com/ameliasquires)")
+ print("\tlicensed under 3-clause BSD, see license for more info")
+ print("")
+ print("utility for copying packages from a overlay to yours")
+ print("")
+ print("usage:")
+ print(" sync - selects repository to update from source, also updates tracked packages")
+ print(" add [repository] - adds a overlay, accepts git links, github account/name (ex: gentoo/gentoo),")
+ print("\tand between everything listed on https://overlays.gentoo.org/")
+ print(" delete - remove a repository")
+ print(" copy [package] - searches all added repositories for package, accepts package-name,")
+ print("\tpackage-location/package-name, do not use : to specify repository")
+ print(" rm [package] - removes or untracks a package, accepts the same params as copy, but allows : to specify repository")
+ print(" ls - lists all tracked packages (not untracked, yet installed ones)")
+ print("")
+ exit(0)
+
+match sys.argv[1]:
+ case "sync" | "s":
+ options = list(repos.keys())
+ name = questionary.select("which repository", options).ask()
+ sync_repos = [repos[name]]
+
+ sync(sync_repos)
+
+ case "add" | "a":
+ assert len(sys.argv) >= 3
+
+ obj = get_repo(sys.argv[2])
+
+ repos[obj["name"]] = obj
+
+ save()
+
+ if questionary.confirm("sync repository?").ask():
+ sync([obj])
+
+ case "delete" | "del" | "d":
+ options = list(repos.keys())
+ name = questionary.select("which repository", options).ask()
+ del_repos = [repos[name]]
+
+ oper = questionary.select("delete", ["repository", "entry & repository"]).ask()
+
+ if not questionary.confirm("are you sure?").ask():
+ print("exiting")
+ exit(0)
+
+ for r in del_repos:
+ full_path = os.path.join(REPO_LOC, r["name"])
+ print("del "+ full_path)
+
+ if os.path.isdir(full_path):
+ shutil.rmtree(full_path)
+ if oper == "entry & repository":
+ del repos[name]
+
+ save()
+
+ case "copy" | "c":
+ assert len(sys.argv) >= 3
+
+ search = sys.argv[2]
+ full = '/' in search
+
+ options = []
+
+ for r in repos:
+ path = os.path.join(REPO_LOC, r)
+
+ if full:
+ full_path = os.path.join(path, search)
+ if os.path.isdir(full_path):
+ options.append({"path": search, "repo": r, "full_name": f"{search}:{r}"})
+ print(options)
+
+ else:
+ if not os.path.isdir(path):
+ continue
+ repo = os.listdir(path)
+ for dir in repo:
+ dir_path = os.path.join(path, dir)
+
+ if os.path.isdir(dir_path):
+ for f in os.listdir(dir_path):
+ if f == search:
+
+ full_path = os.path.join(dir_path, f)
+ name = os.path.join(dir, search)
+ options.append({"path": name, "full_path": full_path, "repo": r, "full_name": f"{name}:{r}" })
+
+ use = None
+ if len(options) > 1:
+ choices = [i["full_name"] for i in options]
+ choice = questionary.select("select an option", choices).ask()
+ use = [x for x in options if x["full_name"] == choice][0]
+ elif len(options) == 1:
+ use = options[0]
+ else:
+ print("no matches")
+ exit(0)
+
+ copy.copy([use])
+
+ case "r" | "rm":
+ assert len(sys.argv) >= 3
+
+ search = sys.argv[2]
+ has_repo = ":" in search
+
+ if has_repo:
+ [search, repo] = search.split(":")
+
+ if not repo in tracked:
+ print("repository has no tracked packages")
+ exit(1)
+
+ rm(search, [repo])
+ else:
+ rm(search, tracked.keys())
+
+ case "ls" | "l" | "list":
+
+ if len(sys.argv) >= 3:
+ if not sys.argv[2] in tracked:
+ print("nothing in repo")
+ exit(1)
+
+ for i in tracked[sys.argv[2]]:
+ print(i)
+ else:
+ for r in tracked.keys():
+ for p in tracked[r]:
+ print(f"{p}:{r}")
diff --git a/src/add.py b/src/add.py
new file mode 100644
index 0000000..00485ce
--- /dev/null
+++ b/src/add.py
@@ -0,0 +1,47 @@
+from common import *
+
+def get_name(url: str) -> str:
+ name = url.split("/")[-1]
+
+ if str(name).endswith(".git"):
+ name = name[0:-4]
+ return name
+
+
+def get_repo(inp: str) -> repo_pair:
+ if str(inp).startswith("http"):
+ obj: repo_pair = {"url" : inp, "name":""}
+ name = get_name(inp)
+
+ obj["name"] = name
+
+ return obj
+ elif '/' in str(inp):
+ return get_repo(f"https://github.com/{inp}")
+ else:
+ data = requests.get("https://qa-reports.gentoo.org/output/repos/repositories.xml")
+ xml = xmltodict.parse(data.text)
+ close = [x for x in xml["repositories"]["repo"] if x["name"] == inp or SequenceMatcher(None, inp, x["name"]).ratio() > 0.6]
+ found = [x for x in close if x["name"] == inp]
+
+ if len(found) > 0:
+ if len(found) == 1:
+ valid_sources = [x for x in found[0]["source"] if x["@type"] == "git"]
+
+ return get_repo(valid_sources[0]["#text"])
+ else:
+ print("multiple results found (TODO)")
+ exit(1)
+ else:
+ if len(close) == 0:
+ print("no exact or close matches")
+ exit(1)
+
+ names = [x["name"] for x in close]
+ choice = questionary.select("no direct matches found, similar results", names).ask()
+ use = [x for x in close if x["name"] == choice]
+ valid_sources = [x for x in use[0]["source"] if x["@type"] == "git"]
+
+ return get_repo(valid_sources[0]["#text"])
+
+
diff --git a/src/common.py b/src/common.py
new file mode 100644
index 0000000..416d836
--- /dev/null
+++ b/src/common.py
@@ -0,0 +1,61 @@
+import yaml
+import sys
+import os
+import subprocess
+import questionary
+import os
+import shutil
+import requests
+import xmltodict
+from difflib import SequenceMatcher
+from typing import TypedDict
+
+REAL_REPO = ""
+REPOS_LOCATION = "/var/db/repos"
+REPO_LOC = ""
+
+try:
+ with open(os.path.expanduser("~/.config/gpo-steal/config.yaml"), "r") as f:
+ cfg = yaml.safe_load(f.read())
+ REAL_REPO = cfg["repo"]
+ REPO_LOC = cfg["repo_location"]
+except FileNotFoundError:
+ print("no configuration found, should be in ~/.config/gpo-steal/config.yaml")
+ exit(1)
+
+FULL_REPOS_LOCATION = os.path.join(REPOS_LOCATION, REAL_REPO)
+REPO_CFG_LOC = os.path.join(REPO_LOC, "repos.yaml")
+TRACKED_CFG_LOC = os.path.join(REPO_LOC, "tracked.yaml")
+
+os.system(f"mkdir -p {REPO_LOC}")
+
+class repo_pair(TypedDict):
+ url: str
+ name: str
+
+repos = {}
+tracked = {}
+
+root = ""
+if not shutil.which("sudo") is None:
+ root = "sudo"
+if not shutil.which("doas") is None:
+ root = "doas"
+try:
+ with open(REPO_CFG_LOC, "r") as f:
+ repos = yaml.safe_load(f.read()) or repos
+except:
+ pass
+
+try:
+ with open(TRACKED_CFG_LOC, "r") as f:
+ tracked = yaml.safe_load(f.read()) or tracked
+except:
+ pass
+
+def save() -> None:
+ with open(REPO_CFG_LOC, "w") as f:
+ yaml.dump(repos, f)
+
+ with open(TRACKED_CFG_LOC, "w") as f:
+ yaml.dump(tracked, f)
diff --git a/src/copy.py b/src/copy.py
new file mode 100644
index 0000000..9ffc24c
--- /dev/null
+++ b/src/copy.py
@@ -0,0 +1,24 @@
+from common import *
+
+def copy(use_d) -> None:
+
+ srcs = []
+ dests = []
+ for use in use_d:
+ if not use["repo"] in tracked:
+ tracked[use["repo"]] = []
+ if not use["path"] in tracked[use["repo"]]:
+ tracked[use["repo"]].append(use["path"])
+ save()
+
+ srcs.append(os.path.join(REPO_LOC, use["repo"], use["path"]))
+ dests.append(os.path.join(FULL_REPOS_LOCATION, use["path"]))
+
+ cmd = f"{root} sh -c 'mkdir -p {' '.join(dests)};"
+ for i in range(len(dests)):
+ cmd += f"cp -r {srcs[i]}/* {dests[i]};"
+ cmd += "'"
+ print(cmd)
+ os.system(cmd)
+
+
diff --git a/src/rm.py b/src/rm.py
new file mode 100644
index 0000000..9f6956e
--- /dev/null
+++ b/src/rm.py
@@ -0,0 +1,37 @@
+from common import *
+
+def rm_pkg(pkg) -> None:
+ cmd = f"{root} rm -fr {os.path.join(FULL_REPOS_LOCATION, pkg)}"
+ print(cmd)
+ os.system(cmd)
+
+def rm(pkg, repos) -> None:
+
+ found = []
+ for r in repos:
+ for t in tracked[r]:
+ if t.endswith(pkg):
+ found.append({"path": t, "repo": r})
+
+ use = None
+
+ if len(found) == 0:
+ print("not found")
+ exit(1)
+ elif len(found) == 1:
+ use = found[0]
+ else:
+ names = [f"{x["path"]}:{x["repo"]}" for x in found]
+ selected = questionary.select(f"{pkg} is ambiguous, select a more specific package", names).ask().split(":")
+ use = [x for x in found if x["path"] == selected[0] and x["repo"] == selected[1]][0]
+ print(use)
+
+ op = questionary.select("remove", ["package", "track", "package & track"]).ask()
+
+ if op == "package" or op == "package & track":
+ rm_pkg(use["path"])
+ if op == "track" or op == "package & track":
+ tracked[use["repo"]].remove(use["path"])
+ save()
+
+
diff --git a/src/sync.py b/src/sync.py
new file mode 100644
index 0000000..8a1a94e
--- /dev/null
+++ b/src/sync.py
@@ -0,0 +1,35 @@
+from common import *
+import copy
+
+def sync_obj(repo_name: str) -> None:
+ if not repo_name in tracked:
+ return
+
+ for p in tracked[repo_name]:
+ copy.copy([{"path":p, "repo": repo_name, "full_path": os.path.join(REPO_LOC, repo_name, p)}])
+
+def sync(sync_repos: list[repo_pair]) -> None:
+ for r in sync_repos:
+ full_path = os.path.join(REPO_LOC, r["name"])
+ if os.path.isdir(full_path):
+
+ old_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD")
+ os.system(f"cd {full_path} && git pull --quiet")
+
+ new_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD")
+
+ if old_hash == new_hash:
+ print(f"{r["name"]}: up to date")
+ else:
+ print(f"{r["name"]}: {new_hash}")
+
+ else:
+ os.system(f"cd {REPO_LOC} && git clone {r["url"]} {r["name"]}")
+ new_hash = subprocess.getoutput(f"cd {full_path} && git rev-parse HEAD")
+
+ print(f"{r["name"]}: {new_hash}")
+
+ if questionary.confirm("would you like to sync all tracked packages, from this repo?").ask():
+ sync_obj(r["name"])
+
+