initial commit

This commit is contained in:
Johanna 2023-10-27 11:30:33 +02:00
commit cf661af751
76 changed files with 3670 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
__pycache__/
.sing/
.signore/

1
.signore Normal file
View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1 @@
__pycache__

View File

View File

@ -0,0 +1,41 @@
import typer
import cson
import os
import sys
import shutil
diff = typer.Typer(name="diff")
@diff.command()
def cmp(shallow: bool = typer.Option(True, "-s", "--shallow")):
"""
Compare Active Working Directory and latest commit on the same branch
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
masterb = config["current_branch"]
if shallow:
for i in os.listdir(os.path.join(".sing", "branches", masterb)):
if not os.path.exists(i) and not i in ignore:
print(f"- {i}")
else:
if not i in ignore:
if shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) > shutil.disk_usage(i):
print(f"+++ {i}")
elif shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) < shutil.disk_usage(i):
print(f"--- {i}")
else:
print(f"=== {i}")
for i in os.listdir():
if (
not os.path.exists(os.path.join(".sing", "branches", masterb, i))
and not i in ignore
):
print(f"+ {i}")

View File

@ -0,0 +1,234 @@
import typer
import socket
import cson
import os
import sys
import pickle
from lib.db import Database
from lib.abc import FileWrap, Key
from lib.keyexchange import generate_keys
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
remote = typer.Typer(name="remote")
cstep = 0
CHUNK_SIZE = 1
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def cycle():
global cstep
steps = ["/", "-", "\\", "|"]
cstep += 1
if cstep == 4:
cstep = 0
return steps[cstep]
@remote.command()
def add(name: str, url: str):
"""
Add a remote named <name> for the repository at <url>. The command 'sing remote fetch' can then
be used to create and update remote branches (<name>/<branch>)
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["remotes"][name] = url
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@remote.command(name="keys")
def kg(
generate: bool = typer.Option(True, "-g", "--generate-new"),
publish: bool = typer.Option(False, "-p", "--publish"),
):
if generate:
generate_keys()
print("REMINDER : THIS KEY WILL BE USED TO ENCRYPT REMOTE PUSHES.")
print("Other Clients will not be able to decrypt the Push unless they")
print("receive the key. Use these commands to share your keys:")
print("\tsing remote keys --publish")
if publish:
if not os.path.exists(os.path.join(".sing", "system", ".keyfile")):
return print(
"Fatal -- no Keys found. Use the '-g' option to create new keys"
)
print("Importing Keys...")
key: Key = Key.kimport(os.path.join(".sing", "system", ".keyfile"))
print(key.randomart)
import getpass
print("Do you want to protect your Keys using a Passphrase?")
print("Recipients will be prompted for their Password before getting the Key")
p = input("[y/N]") or "n"
p = p.lower()
if p in ["y", "yes"]:
passphrase = getpass.getpass("Enter Passphrase : ")
pass_rep = getpass.getpass("Re-Type Passphrase:")
i = 1
while not pass_rep == passphrase and i < 3:
pass_rep = getpass.getpass(
"Wrong Passphrase - Please re-type Passphrase:"
)
i += 1
if i >= 3:
return print("Aborted - 3 Times entered Wrong Password")
@remote.command(name="get-url")
def gurl(remote_name):
"""
Retrieves the URLs for a Remote.
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
rmurl = config["remotes"].get(remote_name, "Fatal -- No Remote Found.")
print(rmurl)
def clone(url, init_fn, pull_fn):
"""
Download objects from remote repository
"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
if os.path.exists(parsed.path.split("/")[-1]):
return print(f"Fatal -- File/Directory exists: {parsed.path.split('/')[-1]}")
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(parsed.hostname), 2991))
s = f"down {parsed.path}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {parsed.path}".encode())
database = []
print("Initializing Repository...")
os.mkdir(parsed.path.split("/")[-1])
os.chdir(parsed.path.split("/")[-1])
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
init_fn(".", Branch_Config["current_branch"], True)
os.chdir("..")
try:
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
except:
config = {}
config = Branch_Config
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{url}/HEAD -> local/HEAD")
print("Pulling changes...")
pull_fn()
@remote.command()
def fetch(remote_name):
"""
Download objects from remote repository
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"down {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {lconfig['repo.name']}".encode())
database = []
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["branches"] = Branch_Config["branches"]
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{remote_name}/HEAD -> local/HEAD")
print("Use 'sing pull' to write into local files")
@remote.command()
def push(remote_name):
"""
Update remote refs along with associated objects
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"up {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(8)
sock.send(s.encode())
db = {
"MainDB": open(os.path.join(".sing", "system", "sing.db"), "rb").read(),
"Branches": config,
"CommitHistory": [
FileWrap(i, open(os.path.join(".sing", "control", i), "rb").read())
for i in os.listdir(os.path.join(".sing", "control"))
],
}
db = pickle.dumps(db)
c = list(chunks(db, CHUNK_SIZE))
for i, chunk in enumerate(c):
print(f"Sending Objects... {i}/{len(c)-1} {cycle()}", end="\r")
sock.send(chunk)
print()
print(f"Origin -> HEAD[{remote_name}]")

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,94 @@
import typing as t
####################
# Directory Wrappers
#
class DirWrap:
def __init__(self, fname, *data: t.List[t.Union["DirWrap", "FileWrap"]]):
self.name = fname
self.contains = {d.name: d for d in data}
self._raw_data = data
def stringify(self, level=0) -> str:
s = ""
for key, value in self.contains.items():
s += (
""
+ ""
+ "" * (4 * level)
+ " "
+ key
+ ("/" if isinstance(value, DirWrap) else "")
+ "\n"
)
if isinstance(value, DirWrap):
s += value.stringify(level + 1)
return s
class FileWrap:
def __init__(self, fname: str, fdata: t.Union[bytes, str]) -> None:
self.name = fname
self.data = fdata.encode() if isinstance(fdata, str) else fdata
#########
# Streams
#
class IStream:
def __init__(self, file) -> None:
self.file = file
def write(self, *data):
self.file.write(*data)
class IOStream:
def __init__(self, fdin: IStream, fdout: "OStream") -> None:
self.fdin = fdin
self.fdout = fdout
def write(self, *data):
self.fdin.write(*data)
def read(self):
return self.fdout.read()
class OStream:
def __init__(self, file) -> None:
self.file = file
def read(self):
return self.file.read()
class Key:
def __init__(self, kpath, key, hash, randomart):
self.kp = kpath
self.key = key
self.hash = hash
self.randomart = randomart
def dump(self):
open(self.kp, "wb+").write(
"--- BEGIN FERNET CRYPTOGRAPHY KEY ---\n".encode()
+ self.key
+ f"\n{self.hash}\n".encode()
+ "\n--- END FERNET CRYPTOGRAPHY KEY ---".encode()
)
@classmethod
def kimport(cls, kp):
k = open(kp, "rb").read().splitlines()
key = k[1]
hash = k[2].decode()
from random_art.randomart import drunkenwalk, draw
randomart = draw(drunkenwalk(key), hash)
return cls(kp, key, hash, randomart)

View File

@ -0,0 +1,30 @@
import pickle
from .abc import IOStream, IStream, OStream
class Database:
def __init__(self, fn) -> None:
self.fn = fn
try:
with open(fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
except:
self.data = {}
def write(self, key, entry):
self.data[key] = entry
def update(self):
with open(self.fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
def get(self, key, default=None):
return self.data.get(key, default)
def commit(self):
with open(self.fn, "wb+") as stream_in:
pickle.dump(self.data, stream_in)
@classmethod
def connect(cls, fname):
return cls(fname)

View File

@ -0,0 +1,51 @@
from .abc import FileWrap, DirWrap
import os
import functools
if hasattr(functools, "cache"):
cache_fn = functools.cache
else:
cache_fn = functools.lru_cache
@cache_fn
def parse_directory(dirp):
structure = {}
os.chdir(dirp)
for d in os.listdir():
if os.path.isdir(d):
structure[d] = parse_directory(d)
elif os.path.isfile(d):
structure[d] = open(d, "rb").read()
os.chdir("..")
return structure
@cache_fn
def wrap_directory(dirp, level=0):
structure = parse_directory(dirp)
data = []
for k, v in structure.items():
if isinstance(v, dict):
data.append(wrap_directory(k, level + 1))
else:
data.append(FileWrap(k, v))
if level == 0:
os.chdir(os.path.join("..", ".."))
return DirWrap(dirp, *data)
@cache_fn
def unpack(dirw: DirWrap):
import shutil
for k, v in dirw.contains.items():
if isinstance(v, DirWrap):
if os.path.isdir(k):
shutil.rmtree(k)
os.mkdir(v.name)
os.chdir(v.name)
unpack(v)
os.chdir("..")
else:
open(k, "wb+").write(v.data)

View File

@ -0,0 +1,20 @@
from cryptography.fernet import Fernet
from random_art.randomart import drunkenwalk, draw
from random import choices
from string import ascii_letters, digits
import os
from .abc import Key
def generate_keys():
key = Fernet.generate_key()
path = os.path.join(".sing", "system", ".keyfile")
hash = "".join(choices(ascii_letters + digits, k=10))
randomart = draw(drunkenwalk(key), hash)
print(f"The Key is {key}")
print("The Key's randomart is")
print(randomart)
key = Key(path, key, hash, randomart)
key.dump()
print(f"Saved Keys in {path}")
return key

419
.sing/branches/foo/sing.py Normal file
View File

@ -0,0 +1,419 @@
import typer
import os
import typing as t
import shutil
from pathlib import Path
import cson
import getpass
from lib import abc
import string
from lib.db import Database
from lib.dirstat import wrap_directory, unpack
import socket
from hooks.remote import remote, clone as _clone
from hooks.diff import diff
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
app.add_typer(remote, name="remote")
app.add_typer(diff, name="diff")
@app.command()
def clone(url: str = typer.Argument(...)):
_clone(url, init_fn=init, pull_fn=pull)
@app.command()
def init(
dir=typer.Argument("."),
branch: str = typer.Option("master", "-b", "--initial-branch"),
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
):
"""
Initializes a new Repository, or reinitializes an existing one
"""
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
user_cfg = {
"user.name": f"",
"user.email": f"",
"repo.name": os.path.basename(os.path.abspath(dir)),
}
if os.path.exists(os.path.join(dir, ".sing")):
if force:
shutil.rmtree(os.path.join(dir, ".sing"))
print("Reinitializing Singularity Repository...")
else:
return print("Singularity Config Directory already exists - Quitting")
os.mkdir(os.path.join(dir, ".sing"))
os.chdir(os.path.join(dir, ".sing"))
os.mkdir("branches") # Branch Directories
os.mkdir(os.path.join("branches", branch)) # Initial Branch
os.mkdir("stage") # Staged Changes
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
n = open(os.path.join("system", "sing.db"), "wb+")
n.close()
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
os.mkdir("control") # Timeline etc.
os.mkdir("overhead") # Stashed Data
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
@app.command()
def config(
key=typer.Argument(None),
list: bool = typer.Option(False, "-l", "--list"),
set: str = typer.Option(None, "--set"),
):
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if key:
if set:
ucfg[key] = set
cson.dump(
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
)
else:
print(ucfg.get(key, f"Not found: {key}"))
if list:
subpart = {}
for k, v in ucfg.items():
root, val = k.split(".")
if root not in subpart:
subpart[root] = []
subpart[root].append((val, v))
for root, values in subpart.items():
print(f"- {root}")
for key, value in values:
print(f"---- {key} -> {value}")
@app.command()
def log():
""""""
for commitfile in os.listdir(os.path.join(".sing", "control")):
print(open(os.path.join(".sing", "control", commitfile)).read())
@app.command()
def stash(files: t.List[Path]):
"""
Stashes Files into Overhead to avoid conflicts. Usually called automatically
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
shutil.rmtree(os.path.join(".sing", "overhead", bn))
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
os.remove(os.path.join(".sing", "overhead", bn))
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
@app.command()
def add(files: t.List[Path]):
"""
Stage Files or Directories for a commit
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "stage", bn)):
shutil.rmtree(os.path.join(".sing", "stage", bn))
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "stage", bn)):
os.remove(os.path.join(".sing", "stage", bn))
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
@app.command()
def rm(files: t.List[str]):
"""
Unstage staged Files
"""
for file in files:
if os.path.exists(os.path.join(".sing", "stage", file)):
if os.path.isdir(os.path.join(".sing", "stage", file)):
shutil.rmtree(os.path.join(".sing", "stage", file))
else:
os.remove(os.path.join(".sing", "stage", file))
@app.command()
def branch(make_new: str = typer.Option(None, "-m", "--new")):
"""
List Branches or make a new one. To switch, use the 'checkout' command.
"""
if not make_new:
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for branch in cfg["branches"]:
if branch == cfg["current_branch"]:
print(f"* {branch}")
else:
print(branch)
else:
os.mkdir(os.path.join(".sing", "branches", make_new))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
cfg["branches"].append(make_new)
cfg["current_branch"] = make_new
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@app.command()
def commit(
message: str = typer.Option(..., "-m", "--message"),
amend: bool = typer.Option(False, "-a", "--amend"),
):
"""
Commit the staged Files and write them to the Database
Options:
-m, --message : The Commit Message.
-a. --amend : Overwrite the last commit.
"""
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
print("*** Please tell me who you are")
print("\nRun\n")
print('\tsing config user.name --set "Your Name" ')
print('\tsing config user.example --set "you@example.com" ')
return
if amend:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
else:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
@app.command()
def stat():
"""
Print the entire sing Configuration Tree
"""
dw = wrap_directory(".sing")
print(dw.stringify(), end="")
print("local{HEAD}")
@app.command()
def slog():
"""
List all Commits in the Database
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
for k, v in db.data.items():
print(f"{k} --> {type(v)}")
@app.command()
def pull():
"""
Stash the current Tree and integrate changes downloaded from Remote into active branch
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
stash([Path(".")])
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, id, branch = i.split()
if branch == cfg["current_branch"]:
break
revert(id, branch=branch)
@app.command(no_args_is_help=True)
def comtree(c_id: str = typer.Argument(...)):
"""
View the Tree Structure of the Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
print(entry.stringify())
print("HEAD/")
@app.command(no_args_is_help=True)
def revert(
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
):
"""
Reverts to Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
@app.command(no_args_is_help=True)
def checkout(
branch: str = typer.Argument(...),
force: bool = typer.Option(False, "-f", "--force"),
):
"""
Switch branches or restore working tree files
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, c_id, _branch = i.split()
if branch == _branch:
break
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
if not force:
stash([Path(".")])
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
if __name__ == "__main__":
app()

View File

@ -0,0 +1 @@
# Lol

View File

@ -0,0 +1 @@
__pycache__

View File

@ -0,0 +1,41 @@
import typer
import cson
import os
import sys
import shutil
diff = typer.Typer(name="diff")
@diff.command()
def cmp(shallow: bool = typer.Option(True, "-s", "--shallow")):
"""
Compare Active Working Directory and latest commit on the same branch
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
masterb = config["current_branch"]
if shallow:
for i in os.listdir(os.path.join(".sing", "branches", masterb)):
if not os.path.exists(i) and not i in ignore:
print(f"- {i}")
else:
if not i in ignore:
if shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) > shutil.disk_usage(i):
print(f"+++ {i}")
elif shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) < shutil.disk_usage(i):
print(f"--- {i}")
else:
print(f"=== {i}")
for i in os.listdir():
if (
not os.path.exists(os.path.join(".sing", "branches", masterb, i))
and not i in ignore
):
print(f"+ {i}")

View File

@ -0,0 +1,234 @@
import typer
import socket
import cson
import os
import sys
import pickle
from lib.db import Database
from lib.abc import FileWrap, Key
from lib.keyexchange import generate_keys
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
remote = typer.Typer(name="remote")
cstep = 0
CHUNK_SIZE = 1
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def cycle():
global cstep
steps = ["/", "-", "\\", "|"]
cstep += 1
if cstep == 4:
cstep = 0
return steps[cstep]
@remote.command()
def add(name: str, url: str):
"""
Add a remote named <name> for the repository at <url>. The command 'sing remote fetch' can then
be used to create and update remote branches (<name>/<branch>)
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["remotes"][name] = url
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@remote.command(name="keys")
def kg(
generate: bool = typer.Option(True, "-g", "--generate-new"),
publish: bool = typer.Option(False, "-p", "--publish"),
):
if generate:
generate_keys()
print("REMINDER : THIS KEY WILL BE USED TO ENCRYPT REMOTE PUSHES.")
print("Other Clients will not be able to decrypt the Push unless they")
print("receive the key. Use these commands to share your keys:")
print("\tsing remote keys --publish")
if publish:
if not os.path.exists(os.path.join(".sing", "system", ".keyfile")):
return print(
"Fatal -- no Keys found. Use the '-g' option to create new keys"
)
print("Importing Keys...")
key: Key = Key.kimport(os.path.join(".sing", "system", ".keyfile"))
print(key.randomart)
import getpass
print("Do you want to protect your Keys using a Passphrase?")
print("Recipients will be prompted for their Password before getting the Key")
p = input("[y/N]") or "n"
p = p.lower()
if p in ["y", "yes"]:
passphrase = getpass.getpass("Enter Passphrase : ")
pass_rep = getpass.getpass("Re-Type Passphrase:")
i = 1
while not pass_rep == passphrase and i < 3:
pass_rep = getpass.getpass(
"Wrong Passphrase - Please re-type Passphrase:"
)
i += 1
if i >= 3:
return print("Aborted - 3 Times entered Wrong Password")
@remote.command(name="get-url")
def gurl(remote_name):
"""
Retrieves the URLs for a Remote.
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
rmurl = config["remotes"].get(remote_name, "Fatal -- No Remote Found.")
print(rmurl)
def clone(url, init_fn, pull_fn):
"""
Download objects from remote repository
"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
if os.path.exists(parsed.path.split("/")[-1]):
return print(f"Fatal -- File/Directory exists: {parsed.path.split('/')[-1]}")
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(parsed.hostname), 2991))
s = f"down {parsed.path}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {parsed.path}".encode())
database = []
print("Initializing Repository...")
os.mkdir(parsed.path.split("/")[-1])
os.chdir(parsed.path.split("/")[-1])
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
init_fn(".", Branch_Config["current_branch"], True)
os.chdir("..")
try:
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
except:
config = {}
config = Branch_Config
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{url}/HEAD -> local/HEAD")
print("Pulling changes...")
pull_fn()
@remote.command()
def fetch(remote_name):
"""
Download objects from remote repository
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"down {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {lconfig['repo.name']}".encode())
database = []
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["branches"] = Branch_Config["branches"]
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{remote_name}/HEAD -> local/HEAD")
print("Use 'sing pull' to write into local files")
@remote.command()
def push(remote_name):
"""
Update remote refs along with associated objects
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"up {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(8)
sock.send(s.encode())
db = {
"MainDB": open(os.path.join(".sing", "system", "sing.db"), "rb").read(),
"Branches": config,
"CommitHistory": [
FileWrap(i, open(os.path.join(".sing", "control", i), "rb").read())
for i in os.listdir(os.path.join(".sing", "control"))
],
}
db = pickle.dumps(db)
c = list(chunks(db, CHUNK_SIZE))
for i, chunk in enumerate(c):
print(f"Sending Objects... {i}/{len(c)-1} {cycle()}", end="\r")
sock.send(chunk)
print()
print(f"Origin -> HEAD[{remote_name}]")

View File

@ -0,0 +1,94 @@
import typing as t
####################
# Directory Wrappers
#
class DirWrap:
def __init__(self, fname, *data: t.List[t.Union["DirWrap", "FileWrap"]]):
self.name = fname
self.contains = {d.name: d for d in data}
self._raw_data = data
def stringify(self, level=0) -> str:
s = ""
for key, value in self.contains.items():
s += (
""
+ ""
+ "" * (4 * level)
+ " "
+ key
+ ("/" if isinstance(value, DirWrap) else "")
+ "\n"
)
if isinstance(value, DirWrap):
s += value.stringify(level + 1)
return s
class FileWrap:
def __init__(self, fname: str, fdata: t.Union[bytes, str]) -> None:
self.name = fname
self.data = fdata.encode() if isinstance(fdata, str) else fdata
#########
# Streams
#
class IStream:
def __init__(self, file) -> None:
self.file = file
def write(self, *data):
self.file.write(*data)
class IOStream:
def __init__(self, fdin: IStream, fdout: "OStream") -> None:
self.fdin = fdin
self.fdout = fdout
def write(self, *data):
self.fdin.write(*data)
def read(self):
return self.fdout.read()
class OStream:
def __init__(self, file) -> None:
self.file = file
def read(self):
return self.file.read()
class Key:
def __init__(self, kpath, key, hash, randomart):
self.kp = kpath
self.key = key
self.hash = hash
self.randomart = randomart
def dump(self):
open(self.kp, "wb+").write(
"--- BEGIN FERNET CRYPTOGRAPHY KEY ---\n".encode()
+ self.key
+ f"\n{self.hash}\n".encode()
+ "\n--- END FERNET CRYPTOGRAPHY KEY ---".encode()
)
@classmethod
def kimport(cls, kp):
k = open(kp, "rb").read().splitlines()
key = k[1]
hash = k[2].decode()
from random_art.randomart import drunkenwalk, draw
randomart = draw(drunkenwalk(key), hash)
return cls(kp, key, hash, randomart)

View File

@ -0,0 +1,30 @@
import pickle
from .abc import IOStream, IStream, OStream
class Database:
def __init__(self, fn) -> None:
self.fn = fn
try:
with open(fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
except:
self.data = {}
def write(self, key, entry):
self.data[key] = entry
def update(self):
with open(self.fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
def get(self, key, default=None):
return self.data.get(key, default)
def commit(self):
with open(self.fn, "wb+") as stream_in:
pickle.dump(self.data, stream_in)
@classmethod
def connect(cls, fname):
return cls(fname)

View File

@ -0,0 +1,51 @@
from .abc import FileWrap, DirWrap
import os
import functools
if hasattr(functools, "cache"):
cache_fn = functools.cache
else:
cache_fn = functools.lru_cache
@cache_fn
def parse_directory(dirp):
structure = {}
os.chdir(dirp)
for d in os.listdir():
if os.path.isdir(d):
structure[d] = parse_directory(d)
elif os.path.isfile(d):
structure[d] = open(d, "rb").read()
os.chdir("..")
return structure
@cache_fn
def wrap_directory(dirp, level=0):
structure = parse_directory(dirp)
data = []
for k, v in structure.items():
if isinstance(v, dict):
data.append(wrap_directory(k, level + 1))
else:
data.append(FileWrap(k, v))
if level == 0:
os.chdir(os.path.join("..", ".."))
return DirWrap(dirp, *data)
@cache_fn
def unpack(dirw: DirWrap):
import shutil
for k, v in dirw.contains.items():
if isinstance(v, DirWrap):
if os.path.isdir(k):
shutil.rmtree(k)
os.mkdir(v.name)
os.chdir(v.name)
unpack(v)
os.chdir("..")
else:
open(k, "wb+").write(v.data)

View File

@ -0,0 +1,20 @@
from cryptography.fernet import Fernet
from random_art.randomart import drunkenwalk, draw
from random import choices
from string import ascii_letters, digits
import os
from .abc import Key
def generate_keys():
key = Fernet.generate_key()
path = os.path.join(".sing", "system", ".keyfile")
hash = "".join(choices(ascii_letters + digits, k=10))
randomart = draw(drunkenwalk(key), hash)
print(f"The Key is {key}")
print("The Key's randomart is")
print(randomart)
key = Key(path, key, hash, randomart)
key.dump()
print(f"Saved Keys in {path}")
return key

View File

@ -0,0 +1,419 @@
import typer
import os
import typing as t
import shutil
from pathlib import Path
import cson
import getpass
from lib import abc
import string
from lib.db import Database
from lib.dirstat import wrap_directory, unpack
import socket
from hooks.remote import remote, clone as _clone
from hooks.diff import diff
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
app.add_typer(remote, name="remote")
app.add_typer(diff, name="diff")
@app.command()
def clone(url: str = typer.Argument(...)):
_clone(url, init_fn=init, pull_fn=pull)
@app.command()
def init(
dir=typer.Argument("."),
branch: str = typer.Option("master", "-b", "--initial-branch"),
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
):
"""
Initializes a new Repository, or reinitializes an existing one
"""
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
user_cfg = {
"user.name": f"",
"user.email": f"",
"repo.name": os.path.basename(os.path.abspath(dir)),
}
if os.path.exists(os.path.join(dir, ".sing")):
if force:
shutil.rmtree(os.path.join(dir, ".sing"))
print("Reinitializing Singularity Repository...")
else:
return print("Singularity Config Directory already exists - Quitting")
os.mkdir(os.path.join(dir, ".sing"))
os.chdir(os.path.join(dir, ".sing"))
os.mkdir("branches") # Branch Directories
os.mkdir(os.path.join("branches", branch)) # Initial Branch
os.mkdir("stage") # Staged Changes
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
n = open(os.path.join("system", "sing.db"), "wb+")
n.close()
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
os.mkdir("control") # Timeline etc.
os.mkdir("overhead") # Stashed Data
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
@app.command()
def config(
key=typer.Argument(None),
list: bool = typer.Option(False, "-l", "--list"),
set: str = typer.Option(None, "--set"),
):
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if key:
if set:
ucfg[key] = set
cson.dump(
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
)
else:
print(ucfg.get(key, f"Not found: {key}"))
if list:
subpart = {}
for k, v in ucfg.items():
root, val = k.split(".")
if root not in subpart:
subpart[root] = []
subpart[root].append((val, v))
for root, values in subpart.items():
print(f"- {root}")
for key, value in values:
print(f"---- {key} -> {value}")
@app.command()
def log():
""""""
for commitfile in os.listdir(os.path.join(".sing", "control")):
print(open(os.path.join(".sing", "control", commitfile)).read())
@app.command()
def stash(files: t.List[Path]):
"""
Stashes Files into Overhead to avoid conflicts. Usually called automatically
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
shutil.rmtree(os.path.join(".sing", "overhead", bn))
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
os.remove(os.path.join(".sing", "overhead", bn))
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
@app.command()
def add(files: t.List[Path]):
"""
Stage Files or Directories for a commit
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "stage", bn)):
shutil.rmtree(os.path.join(".sing", "stage", bn))
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "stage", bn)):
os.remove(os.path.join(".sing", "stage", bn))
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
@app.command()
def rm(files: t.List[str]):
"""
Unstage staged Files
"""
for file in files:
if os.path.exists(os.path.join(".sing", "stage", file)):
if os.path.isdir(os.path.join(".sing", "stage", file)):
shutil.rmtree(os.path.join(".sing", "stage", file))
else:
os.remove(os.path.join(".sing", "stage", file))
@app.command()
def branch(make_new: str = typer.Option(None, "-m", "--new")):
"""
List Branches or make a new one. To switch, use the 'checkout' command.
"""
if not make_new:
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for branch in cfg["branches"]:
if branch == cfg["current_branch"]:
print(f"* {branch}")
else:
print(branch)
else:
os.mkdir(os.path.join(".sing", "branches", make_new))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
cfg["branches"].append(make_new)
cfg["current_branch"] = make_new
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@app.command()
def commit(
message: str = typer.Option(..., "-m", "--message"),
amend: bool = typer.Option(False, "-a", "--amend"),
):
"""
Commit the staged Files and write them to the Database
Options:
-m, --message : The Commit Message.
-a. --amend : Overwrite the last commit.
"""
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
print("*** Please tell me who you are")
print("\nRun\n")
print('\tsing config user.name --set "Your Name" ')
print('\tsing config user.example --set "you@example.com" ')
return
if amend:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
else:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
@app.command()
def stat():
"""
Print the entire sing Configuration Tree
"""
dw = wrap_directory(".sing")
print(dw.stringify(), end="")
print("local{HEAD}")
@app.command()
def slog():
"""
List all Commits in the Database
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
for k, v in db.data.items():
print(f"{k} --> {type(v)}")
@app.command()
def pull():
"""
Stash the current Tree and integrate changes downloaded from Remote into active branch
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
stash([Path(".")])
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, id, branch = i.split()
if branch == cfg["current_branch"]:
break
revert(id, branch=branch)
@app.command(no_args_is_help=True)
def comtree(c_id: str = typer.Argument(...)):
"""
View the Tree Structure of the Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
print(entry.stringify())
print("HEAD/")
@app.command(no_args_is_help=True)
def revert(
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
):
"""
Reverts to Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
@app.command(no_args_is_help=True)
def checkout(
branch: str = typer.Option(None, "-b", "--branch"),
force: bool = typer.Option(False, "-f", "--force"),
):
"""
Switch branches or restore working tree files
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, c_id, _branch = i.split()
if branch == _branch:
break
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
if not force:
stash([Path(".")])
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
if __name__ == "__main__":
app()

View File

@ -0,0 +1 @@
# Lol

View File

@ -0,0 +1,8 @@
commit Td0HEjzAM8FuvnYUOnOTP6p2KTz7v9Ig master
Author: a <a>
Date: 2023-10-16 08:00:22.388514
feat: Initial Commit

View File

@ -0,0 +1,8 @@
commit pgVDG99ljSQxDWI4GZm5aI0ahtDXcmbU secondary
Author: Secondary Jane <SecondaryJane@localhost>
Date: 2023-10-16 14:27:16.538311
kewl

View File

@ -0,0 +1,8 @@
commit icG9CADYZ33JNERREpa2fpGYTRA3G93d master
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-16 14:36:34.801126
Commit

View File

@ -0,0 +1,8 @@
commit zCzv53Xfm4O3gZrz31vj2br4tmWc2mlv master
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-24 16:10:42.542050
blob

View File

@ -0,0 +1,8 @@
commit x1JPywLQQrB3yYv5Qrd2a4Dg3DFWl5kd master
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-25 07:48:27.216594
feat: Add checkout

View File

@ -0,0 +1,8 @@
commit YmRvFt2tbeNt7sbybxaCyen3hRkz2YwU foo
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-25 07:51:09.514969
fix: Improve Checkout Command

View File

@ -0,0 +1,8 @@
commit E1k6zZKOcU0d3815pEwIYcUdXzf0RtPy foo
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-25 07:54:31.541732
fix: Forgot to update config

View File

@ -0,0 +1,8 @@
commit hcfKqMPHotAyoCjDFvRnIyNnFcP2u93U foo
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-26 14:00:13.543435
None

View File

@ -0,0 +1,9 @@
commit L81bNUlwe0XgNi5IHlPa6Ix1HVRs91eO foo
Author: Jane Johanna Yosef <stupidjane@tutanota.com>
Date: 2023-10-26 14:00:41.063254
Hello World

1
.sing/stage/.signore Normal file
View File

@ -0,0 +1 @@
__pycache__

0
.sing/stage/foobar Normal file
View File

Binary file not shown.

Binary file not shown.

41
.sing/stage/hooks/diff.py Normal file
View File

@ -0,0 +1,41 @@
import typer
import cson
import os
import sys
import shutil
diff = typer.Typer(name="diff")
@diff.command()
def cmp(shallow: bool = typer.Option(True, "-s", "--shallow")):
"""
Compare Active Working Directory and latest commit on the same branch
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
masterb = config["current_branch"]
if shallow:
for i in os.listdir(os.path.join(".sing", "branches", masterb)):
if not os.path.exists(i) and not i in ignore:
print(f"- {i}")
else:
if not i in ignore:
if shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) > shutil.disk_usage(i):
print(f"+++ {i}")
elif shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) < shutil.disk_usage(i):
print(f"--- {i}")
else:
print(f"=== {i}")
for i in os.listdir():
if (
not os.path.exists(os.path.join(".sing", "branches", masterb, i))
and not i in ignore
):
print(f"+ {i}")

234
.sing/stage/hooks/remote.py Normal file
View File

@ -0,0 +1,234 @@
import typer
import socket
import cson
import os
import sys
import pickle
from lib.db import Database
from lib.abc import FileWrap, Key
from lib.keyexchange import generate_keys
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
remote = typer.Typer(name="remote")
cstep = 0
CHUNK_SIZE = 1
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def cycle():
global cstep
steps = ["/", "-", "\\", "|"]
cstep += 1
if cstep == 4:
cstep = 0
return steps[cstep]
@remote.command()
def add(name: str, url: str):
"""
Add a remote named <name> for the repository at <url>. The command 'sing remote fetch' can then
be used to create and update remote branches (<name>/<branch>)
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["remotes"][name] = url
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@remote.command(name="keys")
def kg(
generate: bool = typer.Option(True, "-g", "--generate-new"),
publish: bool = typer.Option(False, "-p", "--publish"),
):
if generate:
generate_keys()
print("REMINDER : THIS KEY WILL BE USED TO ENCRYPT REMOTE PUSHES.")
print("Other Clients will not be able to decrypt the Push unless they")
print("receive the key. Use these commands to share your keys:")
print("\tsing remote keys --publish")
if publish:
if not os.path.exists(os.path.join(".sing", "system", ".keyfile")):
return print(
"Fatal -- no Keys found. Use the '-g' option to create new keys"
)
print("Importing Keys...")
key: Key = Key.kimport(os.path.join(".sing", "system", ".keyfile"))
print(key.randomart)
import getpass
print("Do you want to protect your Keys using a Passphrase?")
print("Recipients will be prompted for their Password before getting the Key")
p = input("[y/N]") or "n"
p = p.lower()
if p in ["y", "yes"]:
passphrase = getpass.getpass("Enter Passphrase : ")
pass_rep = getpass.getpass("Re-Type Passphrase:")
i = 1
while not pass_rep == passphrase and i < 3:
pass_rep = getpass.getpass(
"Wrong Passphrase - Please re-type Passphrase:"
)
i += 1
if i >= 3:
return print("Aborted - 3 Times entered Wrong Password")
@remote.command(name="get-url")
def gurl(remote_name):
"""
Retrieves the URLs for a Remote.
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
rmurl = config["remotes"].get(remote_name, "Fatal -- No Remote Found.")
print(rmurl)
def clone(url, init_fn, pull_fn):
"""
Download objects from remote repository
"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
if os.path.exists(parsed.path.split("/")[-1]):
return print(f"Fatal -- File/Directory exists: {parsed.path.split('/')[-1]}")
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(parsed.hostname), 2991))
s = f"down {parsed.path}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {parsed.path}".encode())
database = []
print("Initializing Repository...")
os.mkdir(parsed.path.split("/")[-1])
os.chdir(parsed.path.split("/")[-1])
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
init_fn(".", Branch_Config["current_branch"], True)
os.chdir("..")
try:
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
except:
config = {}
config = Branch_Config
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{url}/HEAD -> local/HEAD")
print("Pulling changes...")
pull_fn()
@remote.command()
def fetch(remote_name):
"""
Download objects from remote repository
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"down {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {lconfig['repo.name']}".encode())
database = []
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["branches"] = Branch_Config["branches"]
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{remote_name}/HEAD -> local/HEAD")
print("Use 'sing pull' to write into local files")
@remote.command()
def push(remote_name):
"""
Update remote refs along with associated objects
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"up {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(8)
sock.send(s.encode())
db = {
"MainDB": open(os.path.join(".sing", "system", "sing.db"), "rb").read(),
"Branches": config,
"CommitHistory": [
FileWrap(i, open(os.path.join(".sing", "control", i), "rb").read())
for i in os.listdir(os.path.join(".sing", "control"))
],
}
db = pickle.dumps(db)
c = list(chunks(db, CHUNK_SIZE))
for i, chunk in enumerate(c):
print(f"Sending Objects... {i}/{len(c)-1} {cycle()}", end="\r")
sock.send(chunk)
print()
print(f"Origin -> HEAD[{remote_name}]")

Binary file not shown.

Binary file not shown.

Binary file not shown.

94
.sing/stage/lib/abc.py Normal file
View File

@ -0,0 +1,94 @@
import typing as t
####################
# Directory Wrappers
#
class DirWrap:
def __init__(self, fname, *data: t.List[t.Union["DirWrap", "FileWrap"]]):
self.name = fname
self.contains = {d.name: d for d in data}
self._raw_data = data
def stringify(self, level=0) -> str:
s = ""
for key, value in self.contains.items():
s += (
""
+ ""
+ "" * (4 * level)
+ " "
+ key
+ ("/" if isinstance(value, DirWrap) else "")
+ "\n"
)
if isinstance(value, DirWrap):
s += value.stringify(level + 1)
return s
class FileWrap:
def __init__(self, fname: str, fdata: t.Union[bytes, str]) -> None:
self.name = fname
self.data = fdata.encode() if isinstance(fdata, str) else fdata
#########
# Streams
#
class IStream:
def __init__(self, file) -> None:
self.file = file
def write(self, *data):
self.file.write(*data)
class IOStream:
def __init__(self, fdin: IStream, fdout: "OStream") -> None:
self.fdin = fdin
self.fdout = fdout
def write(self, *data):
self.fdin.write(*data)
def read(self):
return self.fdout.read()
class OStream:
def __init__(self, file) -> None:
self.file = file
def read(self):
return self.file.read()
class Key:
def __init__(self, kpath, key, hash, randomart):
self.kp = kpath
self.key = key
self.hash = hash
self.randomart = randomart
def dump(self):
open(self.kp, "wb+").write(
"--- BEGIN FERNET CRYPTOGRAPHY KEY ---\n".encode()
+ self.key
+ f"\n{self.hash}\n".encode()
+ "\n--- END FERNET CRYPTOGRAPHY KEY ---".encode()
)
@classmethod
def kimport(cls, kp):
k = open(kp, "rb").read().splitlines()
key = k[1]
hash = k[2].decode()
from random_art.randomart import drunkenwalk, draw
randomart = draw(drunkenwalk(key), hash)
return cls(kp, key, hash, randomart)

30
.sing/stage/lib/db.py Normal file
View File

@ -0,0 +1,30 @@
import pickle
from .abc import IOStream, IStream, OStream
class Database:
def __init__(self, fn) -> None:
self.fn = fn
try:
with open(fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
except:
self.data = {}
def write(self, key, entry):
self.data[key] = entry
def update(self):
with open(self.fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
def get(self, key, default=None):
return self.data.get(key, default)
def commit(self):
with open(self.fn, "wb+") as stream_in:
pickle.dump(self.data, stream_in)
@classmethod
def connect(cls, fname):
return cls(fname)

View File

@ -0,0 +1,51 @@
from .abc import FileWrap, DirWrap
import os
import functools
if hasattr(functools, "cache"):
cache_fn = functools.cache
else:
cache_fn = functools.lru_cache
@cache_fn
def parse_directory(dirp):
structure = {}
os.chdir(dirp)
for d in os.listdir():
if os.path.isdir(d):
structure[d] = parse_directory(d)
elif os.path.isfile(d):
structure[d] = open(d, "rb").read()
os.chdir("..")
return structure
@cache_fn
def wrap_directory(dirp, level=0):
structure = parse_directory(dirp)
data = []
for k, v in structure.items():
if isinstance(v, dict):
data.append(wrap_directory(k, level + 1))
else:
data.append(FileWrap(k, v))
if level == 0:
os.chdir(os.path.join("..", ".."))
return DirWrap(dirp, *data)
@cache_fn
def unpack(dirw: DirWrap):
import shutil
for k, v in dirw.contains.items():
if isinstance(v, DirWrap):
if os.path.isdir(k):
shutil.rmtree(k)
os.mkdir(v.name)
os.chdir(v.name)
unpack(v)
os.chdir("..")
else:
open(k, "wb+").write(v.data)

View File

@ -0,0 +1,20 @@
from cryptography.fernet import Fernet
from random_art.randomart import drunkenwalk, draw
from random import choices
from string import ascii_letters, digits
import os
from .abc import Key
def generate_keys():
key = Fernet.generate_key()
path = os.path.join(".sing", "system", ".keyfile")
hash = "".join(choices(ascii_letters + digits, k=10))
randomart = draw(drunkenwalk(key), hash)
print(f"The Key is {key}")
print("The Key's randomart is")
print(randomart)
key = Key(path, key, hash, randomart)
key.dump()
print(f"Saved Keys in {path}")
return key

419
.sing/stage/sing.py Normal file
View File

@ -0,0 +1,419 @@
import typer
import os
import typing as t
import shutil
from pathlib import Path
import cson
import getpass
from lib import abc
import string
from lib.db import Database
from lib.dirstat import wrap_directory, unpack
import socket
from hooks.remote import remote, clone as _clone
from hooks.diff import diff
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
app.add_typer(remote, name="remote")
app.add_typer(diff, name="diff")
@app.command()
def clone(url: str = typer.Argument(...)):
_clone(url, init_fn=init, pull_fn=pull)
@app.command()
def init(
dir=typer.Argument("."),
branch: str = typer.Option("master", "-b", "--initial-branch"),
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
):
"""
Initializes a new Repository, or reinitializes an existing one
"""
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
user_cfg = {
"user.name": f"",
"user.email": f"",
"repo.name": os.path.basename(os.path.abspath(dir)),
}
if os.path.exists(os.path.join(dir, ".sing")):
if force:
shutil.rmtree(os.path.join(dir, ".sing"))
print("Reinitializing Singularity Repository...")
else:
return print("Singularity Config Directory already exists - Quitting")
os.mkdir(os.path.join(dir, ".sing"))
os.chdir(os.path.join(dir, ".sing"))
os.mkdir("branches") # Branch Directories
os.mkdir(os.path.join("branches", branch)) # Initial Branch
os.mkdir("stage") # Staged Changes
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
n = open(os.path.join("system", "sing.db"), "wb+")
n.close()
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
os.mkdir("control") # Timeline etc.
os.mkdir("overhead") # Stashed Data
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
@app.command()
def config(
key=typer.Argument(None),
list: bool = typer.Option(False, "-l", "--list"),
set: str = typer.Option(None, "--set"),
):
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if key:
if set:
ucfg[key] = set
cson.dump(
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
)
else:
print(ucfg.get(key, f"Not found: {key}"))
if list:
subpart = {}
for k, v in ucfg.items():
root, val = k.split(".")
if root not in subpart:
subpart[root] = []
subpart[root].append((val, v))
for root, values in subpart.items():
print(f"- {root}")
for key, value in values:
print(f"---- {key} -> {value}")
@app.command()
def log():
""""""
for commitfile in os.listdir(os.path.join(".sing", "control")):
print(open(os.path.join(".sing", "control", commitfile)).read())
@app.command()
def stash(files: t.List[Path]):
"""
Stashes Files into Overhead to avoid conflicts. Usually called automatically
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
shutil.rmtree(os.path.join(".sing", "overhead", bn))
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
os.remove(os.path.join(".sing", "overhead", bn))
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
@app.command()
def add(files: t.List[Path]):
"""
Stage Files or Directories for a commit
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "stage", bn)):
shutil.rmtree(os.path.join(".sing", "stage", bn))
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "stage", bn)):
os.remove(os.path.join(".sing", "stage", bn))
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
@app.command()
def rm(files: t.List[str]):
"""
Unstage staged Files
"""
for file in files:
if os.path.exists(os.path.join(".sing", "stage", file)):
if os.path.isdir(os.path.join(".sing", "stage", file)):
shutil.rmtree(os.path.join(".sing", "stage", file))
else:
os.remove(os.path.join(".sing", "stage", file))
@app.command()
def branch(make_new: str = typer.Option(None, "-m", "--new")):
"""
List Branches or make a new one. To switch, use the 'checkout' command.
"""
if not make_new:
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for branch in cfg["branches"]:
if branch == cfg["current_branch"]:
print(f"* {branch}")
else:
print(branch)
else:
os.mkdir(os.path.join(".sing", "branches", make_new))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
cfg["branches"].append(make_new)
cfg["current_branch"] = make_new
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@app.command()
def commit(
message: str = typer.Option(..., "-m", "--message"),
amend: bool = typer.Option(False, "-a", "--amend"),
):
"""
Commit the staged Files and write them to the Database
Options:
-m, --message : The Commit Message.
-a. --amend : Overwrite the last commit.
"""
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
print("*** Please tell me who you are")
print("\nRun\n")
print('\tsing config user.name --set "Your Name" ')
print('\tsing config user.example --set "you@example.com" ')
return
if amend:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
else:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
@app.command()
def stat():
"""
Print the entire sing Configuration Tree
"""
dw = wrap_directory(".sing")
print(dw.stringify(), end="")
print("local{HEAD}")
@app.command()
def slog():
"""
List all Commits in the Database
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
for k, v in db.data.items():
print(f"{k} --> {type(v)}")
@app.command()
def pull():
"""
Stash the current Tree and integrate changes downloaded from Remote into active branch
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
stash([Path(".")])
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, id, branch = i.split()
if branch == cfg["current_branch"]:
break
revert(id, branch=branch)
@app.command(no_args_is_help=True)
def comtree(c_id: str = typer.Argument(...)):
"""
View the Tree Structure of the Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
print(entry.stringify())
print("HEAD/")
@app.command(no_args_is_help=True)
def revert(
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
):
"""
Reverts to Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
@app.command(no_args_is_help=True)
def checkout(
branch: str = typer.Argument(...),
force: bool = typer.Option(False, "-f", "--force"),
):
"""
Switch branches or restore working tree files
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, c_id, _branch = i.split()
if branch == _branch:
break
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
if not force:
stash([Path(".")])
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
if __name__ == "__main__":
app()

1
.sing/stage/test.py Normal file
View File

@ -0,0 +1 @@
# Lol

5
.sing/system/.keyfile Normal file
View File

@ -0,0 +1,5 @@
--- BEGIN FERNET CRYPTOGRAPHY KEY ---
oHVd1Vltn_kSHazxhcr0Mi3v35EnjzEcncGPDdantVc=
s7IyTiiBD4
--- END FERNET CRYPTOGRAPHY KEY ---

View File

@ -0,0 +1 @@
{"current_branch":"foo","branches":["master","secondary","foo"],"remotes":{"text":"192.168.68.110","upstream":"127.0.0.1"}}

View File

@ -0,0 +1 @@
{"user.name":"Jane Johanna Yosef","user.email":"stupidjane@tutanota.com","repo.name":"singularity"}

BIN
.sing/system/sing.db Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

41
hooks/diff.py Normal file
View File

@ -0,0 +1,41 @@
import typer
import cson
import os
import sys
import shutil
diff = typer.Typer(name="diff")
@diff.command()
def cmp(shallow: bool = typer.Option(True, "-s", "--shallow")):
"""
Compare Active Working Directory and latest commit on the same branch
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
masterb = config["current_branch"]
if shallow:
for i in os.listdir(os.path.join(".sing", "branches", masterb)):
if not os.path.exists(i) and not i in ignore:
print(f"- {i}")
else:
if not i in ignore:
if shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) > shutil.disk_usage(i):
print(f"+++ {i}")
elif shutil.disk_usage(
os.path.join(".sing", "branches", masterb, i)
) < shutil.disk_usage(i):
print(f"--- {i}")
else:
print(f"=== {i}")
for i in os.listdir():
if (
not os.path.exists(os.path.join(".sing", "branches", masterb, i))
and not i in ignore
):
print(f"+ {i}")

249
hooks/remote.py Normal file
View File

@ -0,0 +1,249 @@
import typer
import socket
import cson
import os
import sys
import pickle
from lib.db import Database
from lib.abc import FileWrap, Key
from lib.keyexchange import generate_keys
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
remote = typer.Typer(name="remote")
cstep = 0
CHUNK_SIZE = 1
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def cycle():
global cstep
steps = ["/", "-", "\\", "|"]
cstep += 1
if cstep == 4:
cstep = 0
return steps[cstep]
@remote.command()
def add(name: str, url: str):
"""
Add a remote named <name> for the repository at <url>. The command 'sing remote fetch' can then
be used to create and update remote branches (<name>/<branch>)
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["remotes"][name] = url
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@remote.command(name="keys")
def kg(
generate: bool = typer.Option(True, "-g", "--generate-new"),
publish: bool = typer.Option(False, "-p", "--publish"),
):
if generate:
generate_keys()
print("REMINDER : THIS KEY WILL BE USED TO ENCRYPT REMOTE PUSHES.")
print("Other Clients will not be able to decrypt the Push unless they")
print("receive the key. Use these commands to share your keys:")
print("\tsing remote keys --publish")
if publish:
if not os.path.exists(os.path.join(".sing", "system", ".keyfile")):
return print(
"Fatal -- no Keys found. Use the '-g' option to create new keys"
)
print("Importing Keys...")
key: Key = Key.kimport(os.path.join(".sing", "system", ".keyfile"))
print(key.randomart)
import getpass
print("Do you want to protect your Keys using a Passphrase?")
print("Recipients will be prompted for their Password before getting the Key")
p = input("[y/N]") or "n"
p = p.lower()
if p in ["y", "yes"]:
passphrase = getpass.getpass("Enter Passphrase : ")
pass_rep = getpass.getpass("Re-Type Passphrase:")
i = 1
while not pass_rep == passphrase and i < 3:
pass_rep = getpass.getpass(
"Wrong Passphrase - Please re-type Passphrase:"
)
i += 1
if i >= 3:
return print("Aborted - 3 Times entered Wrong Password")
else:
passphrase = ""
@remote.command(name="get-url")
def gurl(remote_name):
"""
Retrieves the URLs for a Remote.
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
rmurl = config["remotes"].get(remote_name, "Fatal -- No Remote Found.")
print(rmurl)
@remote.command(name="list")
def lst():
"""
List all Remotes URLs
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for name, url in config["remotes"].items():
print(f"{name} --> {url}")
def clone(url, init_fn, pull_fn):
"""
Download objects from remote repository
"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
if os.path.exists(parsed.path.split("/")[-1]):
return print(f"Fatal -- File/Directory exists: {parsed.path.split('/')[-1]}")
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(parsed.hostname), 2991))
s = f"down {parsed.path}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {parsed.path}".encode())
database = []
print("Initializing Repository...")
os.mkdir(parsed.path.split("/")[-1])
os.chdir(parsed.path.split("/")[-1])
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
print("remote: Decompressing objects...")
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
init_fn(".", Branch_Config["current_branch"], True)
os.chdir("..")
try:
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
except:
config = {}
config = Branch_Config
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{url}/HEAD -> local/HEAD")
print("Pulling changes...")
pull_fn()
@remote.command()
def fetch(remote_name):
"""
Download objects from remote repository
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"down {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(1024)
sock.send(f"down {lconfig['repo.name']}".encode())
database = []
while True:
print(f"remote: Receiving Objects... {cycle()}", end="\r")
sock.settimeout(1)
try:
packet = sock.recv(4096)
if not packet:
break
database.append(packet)
except Exception as e:
break
print("remote: Unpacking Objects...")
database = b"".join(database)
database = pickle.loads(database)
Main_DB = database.get("MainDB")
Branch_Config = database.get("Branches")
Commit_History = database.get("CommitHistory")
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
config["branches"] = Branch_Config["branches"]
cson.dump(config, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
for i in Commit_History:
with open(os.path.join(".sing", "control", i.name), "wb+") as f:
f.write(i.data)
open(os.path.join(".sing", "system", "sing.db"), "wb+").write(Main_DB)
print(f"{remote_name}/HEAD -> local/HEAD")
print("Use 'sing pull' to write into local files")
@remote.command()
def push(remote_name):
"""
Update remote refs along with associated objects
"""
config = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
lconfig = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
rmurl = config["remotes"].get(remote_name)
print("Connecting to remote Server...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((socket.gethostbyname(rmurl), 2991))
s = f"up {lconfig['repo.name']}"
sock.send(f"{len(s)}".encode())
sock.recv(8)
sock.send(s.encode())
print("remote: Building Database...")
db = {
"MainDB": open(os.path.join(".sing", "system", "sing.db"), "rb").read(),
"Branches": config,
"CommitHistory": [
FileWrap(i, open(os.path.join(".sing", "control", i), "rb").read())
for i in os.listdir(os.path.join(".sing", "control"))
],
}
print("remote: Compressing Objects...")
db = pickle.dumps(db)
c = list(chunks(db, CHUNK_SIZE))
for i, chunk in enumerate(c):
print(f"Sending Objects... {i}/{len(c)-1} {cycle()}", end="\r")
sock.send(chunk)
print()
print(f"Origin -> HEAD[{remote_name}]")

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

94
lib/abc.py Normal file
View File

@ -0,0 +1,94 @@
import typing as t
####################
# Directory Wrappers
#
class DirWrap:
def __init__(self, fname, *data: t.List[t.Union["DirWrap", "FileWrap"]]):
self.name = fname
self.contains = {d.name: d for d in data}
self._raw_data = data
def stringify(self, level=0) -> str:
s = ""
for key, value in self.contains.items():
s += (
""
+ ""
+ "" * (4 * level)
+ " "
+ key
+ ("/" if isinstance(value, DirWrap) else "")
+ "\n"
)
if isinstance(value, DirWrap):
s += value.stringify(level + 1)
return s
class FileWrap:
def __init__(self, fname: str, fdata: t.Union[bytes, str]) -> None:
self.name = fname
self.data = fdata.encode() if isinstance(fdata, str) else fdata
#########
# Streams
#
class IStream:
def __init__(self, file) -> None:
self.file = file
def write(self, *data):
self.file.write(*data)
class IOStream:
def __init__(self, fdin: IStream, fdout: "OStream") -> None:
self.fdin = fdin
self.fdout = fdout
def write(self, *data):
self.fdin.write(*data)
def read(self):
return self.fdout.read()
class OStream:
def __init__(self, file) -> None:
self.file = file
def read(self):
return self.file.read()
class Key:
def __init__(self, kpath, key, hash, randomart):
self.kp = kpath
self.key = key
self.hash = hash
self.randomart = randomart
def dump(self):
open(self.kp, "wb+").write(
"--- BEGIN FERNET CRYPTOGRAPHY KEY ---\n".encode()
+ self.key
+ f"\n{self.hash}\n".encode()
+ "\n--- END FERNET CRYPTOGRAPHY KEY ---".encode()
)
@classmethod
def kimport(cls, kp):
k = open(kp, "rb").read().splitlines()
key = k[1]
hash = k[2].decode()
from random_art.randomart import drunkenwalk, draw
randomart = draw(drunkenwalk(key), hash)
return cls(kp, key, hash, randomart)

30
lib/db.py Normal file
View File

@ -0,0 +1,30 @@
import pickle
from .abc import IOStream, IStream, OStream
class Database:
def __init__(self, fn) -> None:
self.fn = fn
try:
with open(fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
except:
self.data = {}
def write(self, key, entry):
self.data[key] = entry
def update(self):
with open(self.fn, "rb+") as stream_out:
self.data = pickle.load(stream_out)
def get(self, key, default=None):
return self.data.get(key, default)
def commit(self):
with open(self.fn, "wb+") as stream_in:
pickle.dump(self.data, stream_in)
@classmethod
def connect(cls, fname):
return cls(fname)

51
lib/dirstat.py Normal file
View File

@ -0,0 +1,51 @@
from .abc import FileWrap, DirWrap
import os
import functools
if hasattr(functools, "cache"):
cache_fn = functools.cache
else:
cache_fn = functools.lru_cache
@cache_fn
def parse_directory(dirp):
structure = {}
os.chdir(dirp)
for d in os.listdir():
if os.path.isdir(d):
structure[d] = parse_directory(d)
elif os.path.isfile(d):
structure[d] = open(d, "rb").read()
os.chdir("..")
return structure
@cache_fn
def wrap_directory(dirp, level=0):
structure = parse_directory(dirp)
data = []
for k, v in structure.items():
if isinstance(v, dict):
data.append(wrap_directory(k, level + 1))
else:
data.append(FileWrap(k, v))
if level == 0:
os.chdir(os.path.join("..", ".."))
return DirWrap(dirp, *data)
@cache_fn
def unpack(dirw: DirWrap):
import shutil
for k, v in dirw.contains.items():
if isinstance(v, DirWrap):
if os.path.isdir(k):
shutil.rmtree(k)
os.mkdir(v.name)
os.chdir(v.name)
unpack(v)
os.chdir("..")
else:
open(k, "wb+").write(v.data)

20
lib/keyexchange.py Normal file
View File

@ -0,0 +1,20 @@
from cryptography.fernet import Fernet
from random_art.randomart import drunkenwalk, draw
from random import choices
from string import ascii_letters, digits
import os
from .abc import Key
def generate_keys():
key = Fernet.generate_key()
path = os.path.join(".sing", "system", ".keyfile")
hash = "".join(choices(ascii_letters + digits, k=10))
randomart = draw(drunkenwalk(key), hash)
print(f"The Key is {key}")
print("The Key's randomart is")
print(randomart)
key = Key(path, key, hash, randomart)
key.dump()
print(f"Saved Keys in {path}")
return key

426
sing.py Normal file
View File

@ -0,0 +1,426 @@
import typer
import os
import typing as t
import shutil
from pathlib import Path
import cson
import getpass
from lib import abc
import string
from lib.db import Database
from lib.dirstat import wrap_directory, unpack
import socket
from hooks.remote import remote, clone as _clone
from hooks.diff import diff
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
host = s.getsockname()[0]
s.close()
except:
host = "127.0.0.1"
app = typer.Typer(name="sing", help="Singularity - the worst Source Control ever")
app.add_typer(remote, name="remote")
app.add_typer(diff, name="diff")
@app.command()
def clone(url: str = typer.Argument(...)):
_clone(url, init_fn=init, pull_fn=pull)
@app.command()
def init(
dir=typer.Argument("."),
branch: str = typer.Option("master", "-b", "--initial-branch"),
force: bool = typer.Option(False, "-f", "--force", "-r", "--reinit"),
):
"""
Initializes a new Repository, or reinitializes an existing one
"""
initial_cfg = {"current_branch": branch, "branches": [branch], "remotes": {}}
user_cfg = {
"user.name": f"",
"user.email": f"",
"repo.name": os.path.basename(os.path.abspath(dir)),
}
if os.path.exists(os.path.join(dir, ".sing")):
if force:
shutil.rmtree(os.path.join(dir, ".sing"))
print("Reinitializing Singularity Repository...")
else:
return print("Singularity Config Directory already exists - Quitting")
os.mkdir(os.path.join(dir, ".sing"))
os.chdir(os.path.join(dir, ".sing"))
os.mkdir("branches") # Branch Directories
os.mkdir(os.path.join("branches", branch)) # Initial Branch
os.mkdir("stage") # Staged Changes
os.mkdir("system") # Remotes, Branch Config, Local Config, Database
n = open(os.path.join("system", "sing.db"), "wb+")
n.close()
cson.dump(initial_cfg, open(os.path.join("system", "branchcf.cson"), "w+"))
cson.dump(user_cfg, open(os.path.join("system", "localconfig.cson"), "w+"))
os.mkdir("control") # Timeline etc.
os.mkdir("overhead") # Stashed Data
print("Initialized barebones Repository in {0}".format(os.path.abspath(dir)))
@app.command()
def config(
key=typer.Argument(None),
list: bool = typer.Option(False, "-l", "--list"),
set: str = typer.Option(None, "--set"),
):
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if key:
if set:
ucfg[key] = set
cson.dump(
ucfg, open(os.path.join(".sing", "system", "localconfig.cson"), "w+")
)
else:
print(ucfg.get(key, f"Not found: {key}"))
if list:
subpart = {}
for k, v in ucfg.items():
root, val = k.split(".")
if root not in subpart:
subpart[root] = []
subpart[root].append((val, v))
for root, values in subpart.items():
print(f"- {root}")
for key, value in values:
print(f"---- {key} -> {value}")
@app.command()
def log():
""""""
for commitfile in os.listdir(os.path.join(".sing", "control")):
print(open(os.path.join(".sing", "control", commitfile)).read())
@app.command()
def stash(files: t.List[Path]):
"""
Stashes Files into Overhead to avoid conflicts. Usually called automatically
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "overhead", bn)):
shutil.rmtree(os.path.join(".sing", "overhead", bn))
shutil.copytree(fp, os.path.join(".sing", "overhead", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "overhead", bn)):
os.remove(os.path.join(".sing", "overhead", bn))
shutil.copyfile(fp, os.path.join(".sing", "overhead", bn))
@app.command()
def add(files: t.List[Path]):
"""
Stage Files or Directories for a commit
"""
ignore = [".sing"]
if os.path.isfile(".signore"):
ignore.extend(open(".signore").readlines())
for file in files:
fp = os.path.abspath(file.name)
bn = os.path.basename(fp)
if fp == os.getcwd():
add(list([Path(i) for i in os.listdir(fp)]))
return
else:
if bn in ignore:
continue
elif os.path.isdir(fp):
if os.path.isdir(os.path.join(".sing", "stage", bn)):
shutil.rmtree(os.path.join(".sing", "stage", bn))
shutil.copytree(fp, os.path.join(".sing", "stage", bn))
elif os.path.isfile(fp):
if os.path.isfile(os.path.join(".sing", "stage", bn)):
os.remove(os.path.join(".sing", "stage", bn))
shutil.copyfile(fp, os.path.join(".sing", "stage", bn))
@app.command()
def rm(files: t.List[str]):
"""
Unstage staged Files
"""
for file in files:
if os.path.exists(os.path.join(".sing", "stage", file)):
if os.path.isdir(os.path.join(".sing", "stage", file)):
shutil.rmtree(os.path.join(".sing", "stage", file))
else:
os.remove(os.path.join(".sing", "stage", file))
@app.command()
def branch(make_new: str = typer.Option(None, "-m", "--new")):
"""
List Branches or make a new one. To switch, use the 'checkout' command.
"""
if not make_new:
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
for branch in cfg["branches"]:
if branch == cfg["current_branch"]:
print(f"* {branch}")
else:
print(branch)
else:
os.mkdir(os.path.join(".sing", "branches", make_new))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
cfg["branches"].append(make_new)
cfg["current_branch"] = make_new
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson"), "w+"))
@app.command()
def commit(
message: str = typer.Option(None, "-m", "--message"),
amend: bool = typer.Option(False, "-a", "--amend"),
):
"""
Commit the staged Files and write them to the Database
Options:
-m, --message : The Commit Message.
-a. --amend : Overwrite the last commit.
"""
if not message:
open(".commit.tmp", "w+")
typer.edit(filename=".commit.tmp")
message = open(".commit.tmp").read()
os.remove(".commit.tmp")
ucfg = cson.load(open(os.path.join(".sing", "system", "localconfig.cson")))
if ucfg["user.name"] == "" or ucfg["user.email"] == "":
print("*** Please tell me who you are")
print("\nRun\n")
print('\tsing config user.name --set "Your Name" ')
print('\tsing config user.example --set "you@example.com" ')
return
if amend:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))-1}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
else:
try:
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
shutil.rmtree(os.path.join(".sing", "branches", cfg["current_branch"]))
shutil.copytree(
os.path.join(".sing", "stage"),
os.path.join(".sing", "branches", cfg["current_branch"]),
)
import random
from datetime import datetime
cwid = "".join(random.choices(string.ascii_letters + string.digits, k=32))
commitfile = f"""
commit {cwid} {cfg["current_branch"]}
Author: {ucfg['user.name']} <{ucfg['user.email']}>
Date: {datetime.now()}
{message}
"""
commit_data = wrap_directory(
os.path.join(".sing", "branches", cfg["current_branch"])
)
db.write(cwid, commit_data)
db.commit()
print(os.getcwd())
open(
os.path.join(
".sing",
"control",
f"COMMIT-{len(os.listdir(os.path.join('.sing', 'control')))}.commit",
),
"w+",
).write(commitfile)
print(f"Commit Mode +w - On ")
except Exception as e:
import sys, traceback
print(sys.exc_info())
print(traceback.format_exc())
print(e)
@app.command()
def stat():
"""
Print the entire sing Configuration Tree
"""
dw = wrap_directory(".sing")
print(dw.stringify(), end="")
print("local{HEAD}")
@app.command()
def slog():
"""
List all Commits in the Database
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
for k, v in db.data.items():
print(f"{k} --> {type(v)}")
@app.command()
def pull():
"""
Stash the current Tree and integrate changes downloaded from Remote into active branch
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
stash([Path(".")])
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, id, branch = i.split()
if branch == cfg["current_branch"]:
break
revert(id, branch=branch)
@app.command(no_args_is_help=True)
def comtree(c_id: str = typer.Argument(...)):
"""
View the Tree Structure of the Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
print(entry.stringify())
print("HEAD/")
@app.command(no_args_is_help=True)
def revert(
c_id: str = typer.Argument(...), branch: str = typer.Option(None, "-b", "--branch")
):
"""
Reverts to Commit <C_ID>
"""
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
unpack(entry)
@app.command(no_args_is_help=True)
def checkout(
branch: str = typer.Argument(...),
force: bool = typer.Option(False, "-f", "--force"),
):
"""
Switch branches or restore working tree files
"""
cfg = cson.load(open(os.path.join(".sing", "system", "branchcf.cson")))
db = Database.connect(os.path.join(".sing", "system", "sing.db"))
id = len(os.listdir(os.path.join(".sing", "control"))) - 1
commit_fn = os.path.join(".sing", "control", f"COMMIT-{id}.commit")
dname = open(commit_fn).readlines()
for i in dname:
i = i.lstrip()
if i.startswith("commit"):
_, c_id, _branch = i.split()
if branch == _branch:
break
entry = db.get(c_id)
if not entry:
return print(f"Fatal -- Cannot find Commit <{c_id}> -- Aborted")
ignore = [".sing"]
if os.path.exists(".signore"):
ignore += open(".signore").readlines()
if not force:
stash([Path(".")])
for n in os.listdir():
if n in ignore:
continue
if os.path.isfile(n):
os.remove(n)
else:
shutil.rmtree(n)
cfg["current_branch"] = branch
cson.dump(cfg, open(os.path.join(".sing", "system", "branchcf.cson")))
unpack(entry)
if __name__ == "__main__":
app()

1
test.py Normal file
View File

@ -0,0 +1 @@
# Lol