import typer import os import typing as t import shutil from pathlib import Path import cson import getpass from lib import abc import string from lib.db import Database from lib.dirstat import wrap_directory, unpack import socket from hooks.remote import remote, clone as _clone from hooks.diff import diff try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) host = s.getsockname()[0] s.close() except: host = "127.0.0.1" app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever") app.add_typer(remote, name="remote") app.add_typer(diff, name="diff") @app.command() def clone(url: str = typer.Argument(...)): _clone(url, init_fn=init, pull_fn=pull) @app.command() def init( dir=typer.Argument("."), branch: str = typer.Option("master", "-b", "--initial-branch"), force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"), ): """ Initializes a new Repository, or reinitializes an existing one """ initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}} user_cfg = { "user.name": f"", "user.email": f"", "repo.name": os.path.basename(os.path.abspath(dir)), } if os.path.exists(os.path.join(dir, ".sing")): if force: shutil.rmtree(os.path.join(dir, ".sing")) print("Reinitializing Singularity Repository...") else: return print("Singularity Config Directory already exists - Quitting") os.mkdir(os.path.join(dir, ".sing")) os.chdir(os.path.join(dir, ".sing")) os.mkdir("branches") # Branch Directories os.mkdir(os.path.join("branches", branch)) # Initial Branch os.mkdir("stage") # Staged Changes os.mkdir("system") # Remotes, Branch Config, Local Config, Database n = open(os.path.join("system", "sing.db"), "wb+") n.close() cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+")) cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+")) os.mkdir("control") # Timeline etc. os.mkdir("overhead") # Stashed Data print("Initialized barebones Repository in {0}".format(os.path.abspath(dir))) @app.command() def config( key=typer.Argument(None), list: bool = typer.Option(False, "-l", "--list"), set: str = typer.Option(None, "--set"), ): ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson"))) if key: if set: ucfg[key] = set cson.dump( ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+") ) else: print(ucfg.get(key, f"Not found: {key}")) if list: subpart = {} for k, v in ucfg.items(): root, val = k.split(".") if root not in subpart: subpart[root] = [] subpart[root].append((val, v)) for root, values in subpart.items(): print(f"- {root}") for key, value in values: print(f"---- {key} -> {value}") @app.command() def log(): """""" for commitfile in os.listdir(os.path.join(".sing", "control")): print(open(os.path.join(".sing", "control", commitfile)).read()) @app.command() def stash(files: t.List[Path]): """ Stashes Files into Overhead to avoid conflicts. Usually called automatically """ ignore = [".sing"] if os.path.isfile(".signore"): ignore.extend(open(".signore").readlines()) for file in files: fp = os.path.abspath(file.name) bn = os.path.basename(fp) if fp == os.getcwd(): add(list([Path(i) for i in os.listdir(fp)])) return else: if bn in ignore: continue elif os.path.isdir(fp): if os.path.isdir(os.path.join(".sing", "overhead", bn)): shutil.rmtree(os.path.join(".sing", "overhead", bn)) shutil.copytree(fp, os.path.join(".sing", "overhead", bn)) elif os.path.isfile(fp): if os.path.isfile(os.path.join(".sing", "overhead", bn)): os.remove(os.path.join(".sing", "overhead", bn)) shutil.copyfile(fp, os.path.join(".sing", "overhead", bn)) @app.command() def add(files: t.List[Path]): """ Stage Files or Directories for a commit """ ignore = [".sing"] if os.path.isfile(".signore"): ignore.extend(open(".signore").readlines()) for file in files: fp = os.path.abspath(file.name) bn = os.path.basename(fp) if fp == os.getcwd(): add(list([Path(i) for i in os.listdir(fp)])) return else: if bn in ignore: continue elif os.path.isdir(fp): if os.path.isdir(os.path.join(".sing", "stage", bn)): shutil.rmtree(os.path.join(".sing", "stage", bn)) shutil.copytree(fp, os.path.join(".sing", "stage", bn)) elif os.path.isfile(fp): if os.path.isfile(os.path.join(".sing", "stage", bn)): os.remove(os.path.join(".sing", "stage", bn)) shutil.copyfile(fp, os.path.join(".sing", "stage", bn)) @app.command() def rm(files: t.List[str]): """ Unstage staged Files """ for file in files: if os.path.exists(os.path.join(".sing", "stage", file)): if os.path.isdir(os.path.join(".sing", "stage", file)): shutil.rmtree(os.path.join(".sing", "stage", file)) else: os.remove(os.path.join(".sing", "stage", file)) @app.command() def branch(make_new: str = typer.Option(None, "-m", "--new")): """ List Branches or make a new one. To switch, use the 'checkout' command. """ if not make_new: cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) for branch in cfg["branches"]: if branch == cfg["current_branch"]: print(f"* {branch}") else: print(branch) else: os.mkdir(os.path.join(".sing", "branches", make_new)) cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) cfg["branches"].append(make_new) cfg["current_branch"] = make_new cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+")) @app.command() def commit( message: str = typer.Option(None, "-m", "--message"), amend: bool = typer.Option(False, "-a", "--amend"), ): """ Commit the staged Files and write them to the Database Options: -m, --message : The Commit Message. -a. --amend : Overwrite the last commit. """ if not message: open(".commit.tmp", "w+") typer.edit(filename=".commit.tmp") message = open(".commit.tmp").read() os.remove(".commit.tmp") ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson"))) if ucfg["user.name"] == "" or ucfg["user.email"] == "": print("*** Please tell me who you are") print("\nRun\n") print('\tsing config user.name --set "Your Name" ') print('\tsing config user.example --set "you@example.com" ') return if amend: try: db = Database.connect(os.path.join(".sing", "system", "sing.db")) cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"])) shutil.copytree( os.path.join(".sing", "stage"), os.path.join(".sing", "branches", cfg["current_branch"]), ) import random from datetime import datetime cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32)) commitfile = f""" commit {cwid} {cfg["current_branch"]} Author: {ucfg['user.name']} <{ucfg['user.email']}> Date: {datetime.now()} {message} """ commit_data = wrap_directory( os.path.join(".sing", "branches", cfg["current_branch"]) ) db.write(cwid, commit_data) db.commit() print(os.getcwd()) open( os.path.join( ".sing", "control", f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit", ), "w+", ).write(commitfile) print(f"Commit Mode +w - On ") except Exception as e: import sys, traceback print(sys.exc_info()) print(traceback.format_exc()) print(e) else: try: db = Database.connect(os.path.join(".sing", "system", "sing.db")) cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"])) shutil.copytree( os.path.join(".sing", "stage"), os.path.join(".sing", "branches", cfg["current_branch"]), ) import random from datetime import datetime cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32)) commitfile = f""" commit {cwid} {cfg["current_branch"]} Author: {ucfg['user.name']} <{ucfg['user.email']}> Date: {datetime.now()} {message} """ commit_data = wrap_directory( os.path.join(".sing", "branches", cfg["current_branch"]) ) db.write(cwid, commit_data) db.commit() print(os.getcwd()) open( os.path.join( ".sing", "control", f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit", ), "w+", ).write(commitfile) print(f"Commit Mode +w - On ") except Exception as e: import sys, traceback print(sys.exc_info()) print(traceback.format_exc()) print(e) @app.command() def stat(): """ Print the entire sing Configuration Tree """ dw = wrap_directory(".sing") print(dw.stringify(), end="") print("local{HEAD}") @app.command() def slog(): """ List all Commits in the Database """ db = Database.connect(os.path.join(".sing", "system", "sing.db")) for k, v in db.data.items(): print(f"{k} --> {type(v)}") @app.command() def pull(): """ Stash the current Tree and integrate changes downloaded from Remote into active branch """ cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) stash([Path(".")]) id = len(os.listdir(os.path.join(".sing", "control"))) - 1 commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit") dname = open(commit_fn).readlines() for i in dname: i = i.lstrip() if i.startswith("commit"): _, id, branch = i.split() if branch == cfg["current_branch"]: break revert(id, branch=branch) @app.command(no_args_is_help=True) def comtree(c_id: str = typer.Argument(...)): """ View the Tree Structure of the Commit """ db = Database.connect(os.path.join(".sing", "system", "sing.db")) entry = db.get(c_id) if not entry: return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted") print(entry.stringify()) print("HEAD/") @app.command(no_args_is_help=True) def revert( c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch") ): """ Reverts to Commit """ db = Database.connect(os.path.join(".sing", "system", "sing.db")) entry = db.get(c_id) if not entry: return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted") ignore = [".sing"] if os.path.exists(".signore"): ignore += open(".signore").readlines() for n in os.listdir(): if n in ignore: continue if os.path.isfile(n): os.remove(n) else: shutil.rmtree(n) unpack(entry) @app.command(no_args_is_help=True) def checkout( branch: str = typer.Argument(...), force: bool = typer.Option(False, "-f", "--force"), ): """ Switch branches or restore working tree files """ cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson"))) db = Database.connect(os.path.join(".sing", "system", "sing.db")) id = len(os.listdir(os.path.join(".sing", "control"))) - 1 commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit") dname = open(commit_fn).readlines() for i in dname: i = i.lstrip() if i.startswith("commit"): _, c_id, _branch = i.split() if branch == _branch: break entry = db.get(c_id) if not entry: return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted") ignore = [".sing"] if os.path.exists(".signore"): ignore += open(".signore").readlines() if not force: stash([Path(".")]) for n in os.listdir(): if n in ignore: continue if os.path.isfile(n): os.remove(n) else: shutil.rmtree(n) cfg["current_branch"] = branch cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"))) unpack(entry) if __name__ == "__main__": app()