Singularity/sing.py

427 lines
14 KiB
Python

import typer
import os
import typing as t
import shutil
from pathlib import Path
import cson
import getpass
from lib import abc
import string
from lib.db import Database
from lib.dirstat import wrap_directory, unpack
import socket
from hooks.remote import remote, clone as _clone
from hooks.diff import diff
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
app.add_typer(remote, name="remote")
app.add_typer(diff, name="diff")
@app.command()
def clone(url: str = typer.Argument(...)):
_clone(url, init_fn=init, pull_fn=pull)
@app.command()
def init(
dir=typer.Argument("."),
branch: str = typer.Option("master", "-b", "--initial-branch"),
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
):
"""
Initializes a new Repository, or reinitializes an existing one
"""
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
user_cfg = {
"user.name": f"",
"user.email": f"",
"repo.name": os.path.basename(os.path.abspath(dir)),
}
if os.path.exists(os.path.join(dir, ".sing")):
if force:
shutil.rmtree(os.path.join(dir, ".sing"))
print("Reinitializing Singularity Repository...")
else:
return print("Singularity Config Directory already exists - Quitting")
os.mkdir(os.path.join(dir, ".sing"))
os.chdir(os.path.join(dir, ".sing"))
os.mkdir("branches") # Branch Directories
os.mkdir(os.path.join("branches", branch)) # Initial Branch
os.mkdir("stage") # Staged Changes
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
n = open(os.path.join("system", "sing.db"), "wb+")
n.close()
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
os.mkdir("control") # Timeline etc.
os.mkdir("overhead") # Stashed Data
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
@app.command()
def config(
key=typer.Argument(None),
list: bool = typer.Option(False, "-l", "--list"),
set: str = typer.Option(None, "--set"),
):
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if key:
if set:
ucfg[key] = set
cson.dump(
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
)
else:
print(ucfg.get(key, f"Not found: {key}"))
if list:
subpart = {}
for k, v in ucfg.items():
root, val = k.split(".")
if root not in subpart:
subpart[root] = []
subpart[root].append((val, v))
for root, values in subpart.items():
print(f"- {root}")
for key, value in values:
print(f"---- {key} -> {value}")
@app.command()
def log():
""""""
for commitfile in os.listdir(os.path.join(".sing", "control")):
print(open(os.path.join(".sing", "control", commitfile)).read())
@app.command()
def stash(files: t.List[Path]):
"""
Stashes Files into Overhead to avoid conflicts. Usually called automatically
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
shutil.rmtree(os.path.join(".sing", "overhead", bn))
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
os.remove(os.path.join(".sing", "overhead", bn))
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
@app.command()
def add(files: t.List[Path]):
"""
Stage Files or Directories for a commit
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "stage", bn)):
shutil.rmtree(os.path.join(".sing", "stage", bn))
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "stage", bn)):
os.remove(os.path.join(".sing", "stage", bn))
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
@app.command()
def rm(files: t.List[str]):
"""
Unstage staged Files
"""
for file in files:
if os.path.exists(os.path.join(".sing", "stage", file)):
if os.path.isdir(os.path.join(".sing", "stage", file)):
shutil.rmtree(os.path.join(".sing", "stage", file))
else:
os.remove(os.path.join(".sing", "stage", file))
@app.command()
def branch(make_new: str = typer.Option(None, "-m", "--new")):
"""
List Branches or make a new one. To switch, use the 'checkout' command.
"""
if not make_new:
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for branch in cfg["branches"]:
if branch == cfg["current_branch"]:
print(f"* {branch}")
else:
print(branch)
else:
os.mkdir(os.path.join(".sing", "branches", make_new))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
cfg["branches"].append(make_new)
cfg["current_branch"] = make_new
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@app.command()
def commit(
message: str = typer.Option(None, "-m", "--message"),
amend: bool = typer.Option(False, "-a", "--amend"),
):
"""
Commit the staged Files and write them to the Database
Options:
-m, --message : The Commit Message.
-a. --amend : Overwrite the last commit.
"""
if not message:
open(".commit.tmp", "w+")
typer.edit(filename=".commit.tmp")
message = open(".commit.tmp").read()
os.remove(".commit.tmp")
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
print("*** Please tell me who you are")
print("\nRun\n")
print('\tsing config user.name --set "Your Name" ')
print('\tsing config user.example --set "you@example.com" ')
return
if amend:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
else:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
@app.command()
def stat():
"""
Print the entire sing Configuration Tree
"""
dw = wrap_directory(".sing")
print(dw.stringify(), end="")
print("local{HEAD}")
@app.command()
def slog():
"""
List all Commits in the Database
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
for k, v in db.data.items():
print(f"{k} --> {type(v)}")
@app.command()
def pull():
"""
Stash the current Tree and integrate changes downloaded from Remote into active branch
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
stash([Path(".")])
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, id, branch = i.split()
if branch == cfg["current_branch"]:
break
revert(id, branch=branch)
@app.command(no_args_is_help=True)
def comtree(c_id: str = typer.Argument(...)):
"""
View the Tree Structure of the Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
print(entry.stringify())
print("HEAD/")
@app.command(no_args_is_help=True)
def revert(
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
):
"""
Reverts to Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
@app.command(no_args_is_help=True)
def checkout(
branch: str = typer.Argument(...),
force: bool = typer.Option(False, "-f", "--force"),
):
"""
Switch branches or restore working tree files
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, c_id, _branch = i.split()
if branch == _branch:
break
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
if not force:
stash([Path(".")])
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
cfg["current_branch"] = branch
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson")))
unpack(entry)
if __name__ == "__main__":
app()