427 lines
14 KiB
Python
427 lines
14 KiB
Python
import typer
|
|
import os
|
|
import typing as t
|
|
import shutil
|
|
from pathlib import Path
|
|
import cson
|
|
import getpass
|
|
from lib import abc
|
|
import string
|
|
from lib.db import Database
|
|
from lib.dirstat import wrap_directory, unpack
|
|
|
|
import socket
|
|
|
|
from hooks.remote import remote, clone as _clone
|
|
from hooks.diff import diff
|
|
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
host = s.getsockname()[0]
|
|
s.close()
|
|
except:
|
|
host = "127.0.0.1"
|
|
|
|
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
|
|
app.add_typer(remote, name="remote")
|
|
app.add_typer(diff, name="diff")
|
|
|
|
|
|
@app.command()
|
|
def clone(url: str = typer.Argument(...)):
|
|
_clone(url, init_fn=init, pull_fn=pull)
|
|
|
|
|
|
@app.command()
|
|
def init(
|
|
dir=typer.Argument("."),
|
|
branch: str = typer.Option("master", "-b", "--initial-branch"),
|
|
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
|
|
):
|
|
"""
|
|
Initializes a new Repository, or reinitializes an existing one
|
|
"""
|
|
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
|
|
user_cfg = {
|
|
"user.name": f"",
|
|
"user.email": f"",
|
|
"repo.name": os.path.basename(os.path.abspath(dir)),
|
|
}
|
|
if os.path.exists(os.path.join(dir, ".sing")):
|
|
if force:
|
|
shutil.rmtree(os.path.join(dir, ".sing"))
|
|
print("Reinitializing Singularity Repository...")
|
|
else:
|
|
return print("Singularity Config Directory already exists - Quitting")
|
|
os.mkdir(os.path.join(dir, ".sing"))
|
|
os.chdir(os.path.join(dir, ".sing"))
|
|
os.mkdir("branches") # Branch Directories
|
|
os.mkdir(os.path.join("branches", branch)) # Initial Branch
|
|
os.mkdir("stage") # Staged Changes
|
|
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
|
|
n = open(os.path.join("system", "sing.db"), "wb+")
|
|
n.close()
|
|
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
|
|
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
|
|
os.mkdir("control") # Timeline etc.
|
|
os.mkdir("overhead") # Stashed Data
|
|
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
|
|
|
|
|
|
@app.command()
|
|
def config(
|
|
key=typer.Argument(None),
|
|
list: bool = typer.Option(False, "-l", "--list"),
|
|
set: str = typer.Option(None, "--set"),
|
|
):
|
|
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
|
|
if key:
|
|
if set:
|
|
ucfg[key] = set
|
|
cson.dump(
|
|
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
|
|
)
|
|
else:
|
|
print(ucfg.get(key, f"Not found: {key}"))
|
|
if list:
|
|
subpart = {}
|
|
for k, v in ucfg.items():
|
|
root, val = k.split(".")
|
|
if root not in subpart:
|
|
subpart[root] = []
|
|
subpart[root].append((val, v))
|
|
for root, values in subpart.items():
|
|
print(f"- {root}")
|
|
for key, value in values:
|
|
print(f"---- {key} -> {value}")
|
|
|
|
|
|
@app.command()
|
|
def log():
|
|
""""""
|
|
for commitfile in os.listdir(os.path.join(".sing", "control")):
|
|
print(open(os.path.join(".sing", "control", commitfile)).read())
|
|
|
|
|
|
@app.command()
|
|
def stash(files: t.List[Path]):
|
|
"""
|
|
Stashes Files into Overhead to avoid conflicts. Usually called automatically
|
|
"""
|
|
ignore = [".sing"]
|
|
if os.path.isfile(".signore"):
|
|
ignore.extend(open(".signore").readlines())
|
|
for file in files:
|
|
fp = os.path.abspath(file.name)
|
|
bn = os.path.basename(fp)
|
|
if fp == os.getcwd():
|
|
add(list([Path(i) for i in os.listdir(fp)]))
|
|
return
|
|
else:
|
|
if bn in ignore:
|
|
continue
|
|
elif os.path.isdir(fp):
|
|
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
|
|
shutil.rmtree(os.path.join(".sing", "overhead", bn))
|
|
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
|
|
elif os.path.isfile(fp):
|
|
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
|
|
os.remove(os.path.join(".sing", "overhead", bn))
|
|
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
|
|
|
|
|
|
@app.command()
|
|
def add(files: t.List[Path]):
|
|
"""
|
|
Stage Files or Directories for a commit
|
|
"""
|
|
ignore = [".sing"]
|
|
if os.path.isfile(".signore"):
|
|
ignore.extend(open(".signore").readlines())
|
|
for file in files:
|
|
fp = os.path.abspath(file.name)
|
|
bn = os.path.basename(fp)
|
|
if fp == os.getcwd():
|
|
add(list([Path(i) for i in os.listdir(fp)]))
|
|
return
|
|
else:
|
|
if bn in ignore:
|
|
continue
|
|
elif os.path.isdir(fp):
|
|
if os.path.isdir(os.path.join(".sing", "stage", bn)):
|
|
shutil.rmtree(os.path.join(".sing", "stage", bn))
|
|
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
|
|
elif os.path.isfile(fp):
|
|
if os.path.isfile(os.path.join(".sing", "stage", bn)):
|
|
os.remove(os.path.join(".sing", "stage", bn))
|
|
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
|
|
|
|
|
|
@app.command()
|
|
def rm(files: t.List[str]):
|
|
"""
|
|
Unstage staged Files
|
|
"""
|
|
for file in files:
|
|
if os.path.exists(os.path.join(".sing", "stage", file)):
|
|
if os.path.isdir(os.path.join(".sing", "stage", file)):
|
|
shutil.rmtree(os.path.join(".sing", "stage", file))
|
|
else:
|
|
os.remove(os.path.join(".sing", "stage", file))
|
|
|
|
|
|
@app.command()
|
|
def branch(make_new: str = typer.Option(None, "-m", "--new")):
|
|
"""
|
|
List Branches or make a new one. To switch, use the 'checkout' command.
|
|
"""
|
|
if not make_new:
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
for branch in cfg["branches"]:
|
|
if branch == cfg["current_branch"]:
|
|
print(f"* {branch}")
|
|
else:
|
|
print(branch)
|
|
else:
|
|
os.mkdir(os.path.join(".sing", "branches", make_new))
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
cfg["branches"].append(make_new)
|
|
cfg["current_branch"] = make_new
|
|
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
|
|
|
|
|
|
@app.command()
|
|
def commit(
|
|
message: str = typer.Option(None, "-m", "--message"),
|
|
amend: bool = typer.Option(False, "-a", "--amend"),
|
|
):
|
|
"""
|
|
Commit the staged Files and write them to the Database
|
|
|
|
Options:
|
|
-m, --message : The Commit Message.
|
|
-a. --amend : Overwrite the last commit.
|
|
"""
|
|
if not message:
|
|
open(".commit.tmp", "w+")
|
|
typer.edit(filename=".commit.tmp")
|
|
message = open(".commit.tmp").read()
|
|
os.remove(".commit.tmp")
|
|
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
|
|
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
|
|
print("*** Please tell me who you are")
|
|
print("\nRun\n")
|
|
print('\tsing config user.name --set "Your Name" ')
|
|
print('\tsing config user.example --set "you@example.com" ')
|
|
return
|
|
if amend:
|
|
try:
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
|
|
shutil.copytree(
|
|
os.path.join(".sing", "stage"),
|
|
os.path.join(".sing", "branches", cfg["current_branch"]),
|
|
)
|
|
import random
|
|
from datetime import datetime
|
|
|
|
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
|
|
|
|
commitfile = f"""
|
|
commit {cwid} {cfg["current_branch"]}
|
|
|
|
Author: {ucfg['user.name']} <{ucfg['user.email']}>
|
|
Date: {datetime.now()}
|
|
|
|
{message}
|
|
"""
|
|
commit_data = wrap_directory(
|
|
os.path.join(".sing", "branches", cfg["current_branch"])
|
|
)
|
|
db.write(cwid, commit_data)
|
|
db.commit()
|
|
print(os.getcwd())
|
|
open(
|
|
os.path.join(
|
|
".sing",
|
|
"control",
|
|
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
|
|
),
|
|
"w+",
|
|
).write(commitfile)
|
|
print(f"Commit Mode +w - On ")
|
|
except Exception as e:
|
|
import sys, traceback
|
|
|
|
print(sys.exc_info())
|
|
print(traceback.format_exc())
|
|
print(e)
|
|
|
|
else:
|
|
try:
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
|
|
shutil.copytree(
|
|
os.path.join(".sing", "stage"),
|
|
os.path.join(".sing", "branches", cfg["current_branch"]),
|
|
)
|
|
import random
|
|
from datetime import datetime
|
|
|
|
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
|
|
|
|
commitfile = f"""
|
|
commit {cwid} {cfg["current_branch"]}
|
|
|
|
Author: {ucfg['user.name']} <{ucfg['user.email']}>
|
|
Date: {datetime.now()}
|
|
|
|
{message}
|
|
"""
|
|
commit_data = wrap_directory(
|
|
os.path.join(".sing", "branches", cfg["current_branch"])
|
|
)
|
|
db.write(cwid, commit_data)
|
|
db.commit()
|
|
print(os.getcwd())
|
|
open(
|
|
os.path.join(
|
|
".sing",
|
|
"control",
|
|
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
|
|
),
|
|
"w+",
|
|
).write(commitfile)
|
|
print(f"Commit Mode +w - On ")
|
|
except Exception as e:
|
|
import sys, traceback
|
|
|
|
print(sys.exc_info())
|
|
print(traceback.format_exc())
|
|
print(e)
|
|
|
|
|
|
@app.command()
|
|
def stat():
|
|
"""
|
|
Print the entire sing Configuration Tree
|
|
"""
|
|
dw = wrap_directory(".sing")
|
|
print(dw.stringify(), end="")
|
|
print("local{HEAD}")
|
|
|
|
|
|
@app.command()
|
|
def slog():
|
|
"""
|
|
List all Commits in the Database
|
|
"""
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
for k, v in db.data.items():
|
|
print(f"{k} --> {type(v)}")
|
|
|
|
|
|
@app.command()
|
|
def pull():
|
|
"""
|
|
Stash the current Tree and integrate changes downloaded from Remote into active branch
|
|
"""
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
stash([Path(".")])
|
|
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
|
|
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
|
|
dname = open(commit_fn).readlines()
|
|
for i in dname:
|
|
i = i.lstrip()
|
|
if i.startswith("commit"):
|
|
_, id, branch = i.split()
|
|
if branch == cfg["current_branch"]:
|
|
break
|
|
revert(id, branch=branch)
|
|
|
|
|
|
@app.command(no_args_is_help=True)
|
|
def comtree(c_id: str = typer.Argument(...)):
|
|
"""
|
|
View the Tree Structure of the Commit <C_ID>
|
|
"""
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
entry = db.get(c_id)
|
|
if not entry:
|
|
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
|
|
print(entry.stringify())
|
|
print("HEAD/")
|
|
|
|
|
|
@app.command(no_args_is_help=True)
|
|
def revert(
|
|
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
|
|
):
|
|
"""
|
|
Reverts to Commit <C_ID>
|
|
"""
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
entry = db.get(c_id)
|
|
if not entry:
|
|
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
|
|
ignore = [".sing"]
|
|
if os.path.exists(".signore"):
|
|
ignore += open(".signore").readlines()
|
|
|
|
for n in os.listdir():
|
|
if n in ignore:
|
|
continue
|
|
if os.path.isfile(n):
|
|
os.remove(n)
|
|
else:
|
|
shutil.rmtree(n)
|
|
unpack(entry)
|
|
|
|
|
|
@app.command(no_args_is_help=True)
|
|
def checkout(
|
|
branch: str = typer.Argument(...),
|
|
force: bool = typer.Option(False, "-f", "--force"),
|
|
):
|
|
"""
|
|
Switch branches or restore working tree files
|
|
"""
|
|
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
|
|
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
|
|
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
|
|
dname = open(commit_fn).readlines()
|
|
for i in dname:
|
|
i = i.lstrip()
|
|
if i.startswith("commit"):
|
|
_, c_id, _branch = i.split()
|
|
if branch == _branch:
|
|
break
|
|
|
|
entry = db.get(c_id)
|
|
if not entry:
|
|
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
|
|
ignore = [".sing"]
|
|
|
|
if os.path.exists(".signore"):
|
|
ignore += open(".signore").readlines()
|
|
if not force:
|
|
stash([Path(".")])
|
|
for n in os.listdir():
|
|
if n in ignore:
|
|
continue
|
|
if os.path.isfile(n):
|
|
os.remove(n)
|
|
else:
|
|
shutil.rmtree(n)
|
|
cfg["current_branch"] = branch
|
|
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson")))
|
|
unpack(entry)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|