add commit hash check (and fmt)

This commit is contained in:
JJTech0130 2023-08-15 09:05:08 -04:00
parent 21fdfc97a9
commit 8a09a6eecd
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6

161
demo.py
View file

@ -5,6 +5,7 @@ import threading
import time import time
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from getpass import getpass from getpass import getpass
from subprocess import PIPE, Popen
from rich.logging import RichHandler from rich.logging import RichHandler
@ -18,7 +19,7 @@ logging.basicConfig(
# Set sane log levels # Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3 logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
logging.getLogger("asyncio").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO) logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO) logging.getLogger("nac").setLevel(logging.INFO)
@ -30,6 +31,11 @@ logging.getLogger("imessage").setLevel(logging.INFO)
logging.captureWarnings(True) logging.captureWarnings(True)
process = Popen(["git", "rev-parse", "HEAD"], stdout=PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
commit_hash = commit_hash.decode().strip()
# Try and load config.json # Try and load config.json
try: try:
with open("config.json", "r") as f: with open("config.json", "r") as f:
@ -37,6 +43,13 @@ try:
except FileNotFoundError: except FileNotFoundError:
CONFIG = {} CONFIG = {}
# Re-register if the commit hash has changed
if CONFIG.get("commit_hash") != commit_hash:
logging.warning("pypush commit is different, forcing re-registration...")
CONFIG["commit_hash"] = commit_hash
if "id" in CONFIG:
del CONFIG["id"]
conn = apns.APNSConnection( conn = apns.APNSConnection(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
) )
@ -116,44 +129,48 @@ im = imessage.iMessageUser(conn, user)
INPUT_QUEUE = apns.IncomingQueue() INPUT_QUEUE = apns.IncomingQueue()
def input_thread(): def input_thread():
from prompt_toolkit import prompt from prompt_toolkit import prompt
while True:
while True:
try: try:
msg = prompt('>> ') msg = prompt(">> ")
except: except:
msg = 'quit' msg = "quit"
INPUT_QUEUE.append(msg) INPUT_QUEUE.append(msg)
threading.Thread(target=input_thread, daemon=True).start() threading.Thread(target=input_thread, daemon=True).start()
print("Type 'help' for help") print("Type 'help' for help")
def fixup_handle(handle): def fixup_handle(handle):
if handle.startswith('tel:+'): if handle.startswith("tel:+"):
return handle return handle
elif handle.startswith('mailto:'): elif handle.startswith("mailto:"):
return handle return handle
elif handle.startswith('tel:'): elif handle.startswith("tel:"):
return 'tel:+' + handle[4:] return "tel:+" + handle[4:]
elif handle.startswith('+'): elif handle.startswith("+"):
return 'tel:' + handle return "tel:" + handle
# If the handle starts with a number # If the handle starts with a number
elif handle[0].isdigit(): elif handle[0].isdigit():
# If the handle is 10 digits, assume it's a US number # If the handle is 10 digits, assume it's a US number
if len(handle) == 10: if len(handle) == 10:
return 'tel:+1' + handle return "tel:+1" + handle
# If the handle is 11 digits, assume it's a US number with country code # If the handle is 11 digits, assume it's a US number with country code
elif len(handle) == 11: elif len(handle) == 11:
return 'tel:+' + handle return "tel:+" + handle
else: # Assume it's an email else: # Assume it's an email
return 'mailto:' + handle return "mailto:" + handle
current_participants = [] current_participants = []
current_effect = None current_effect = None
while True: while True:
im.activate_sms() # We must call this always since SMS could be turned off and on again, and it might have been on before this. im.activate_sms() # We must call this always since SMS could be turned off and on again, and it might have been on before this.
msg = im.receive() msg = im.receive()
if msg is not None: if msg is not None:
# print(f'[{msg.sender}] {msg.text}') # print(f'[{msg.sender}] {msg.text}')
@ -170,79 +187,95 @@ while True:
# print(f"({len(attachments)} attachment{'s have' if len(attachments) != 1 else ' has'} been downloaded and put " # print(f"({len(attachments)} attachment{'s have' if len(attachments) != 1 else ' has'} been downloaded and put "
# f"in {attachments_path})") # f"in {attachments_path})")
if len(INPUT_QUEUE) > 0: if len(INPUT_QUEUE) > 0:
msg = INPUT_QUEUE.pop() msg = INPUT_QUEUE.pop()
if msg == '': continue if msg == "":
if msg == 'help' or msg == 'h': continue
print('help (h): show this message') if msg == "help" or msg == "h":
print('quit (q): quit') print("help (h): show this message")
#print('send (s) [recipient] [message]: send a message') print("quit (q): quit")
print('filter (f) [recipient]: set the current chat') # print('send (s) [recipient] [message]: send a message')
print('effect (e): adds an iMessage effect to the next sent message') print("filter (f) [recipient]: set the current chat")
print('note: recipient must start with tel: or mailto: and include the country code') print("effect (e): adds an iMessage effect to the next sent message")
print('handle <handle>: set the current handle (for sending messages)') print(
print('\\: escape commands (will be removed from message)') "note: recipient must start with tel: or mailto: and include the country code"
elif msg == 'quit' or msg == 'q': )
print("handle <handle>: set the current handle (for sending messages)")
print("\\: escape commands (will be removed from message)")
elif msg == "quit" or msg == "q":
break break
elif msg == 'effect' or msg == 'e' or msg.startswith("effect ") or msg.startswith("e "): elif (
msg == "effect"
or msg == "e"
or msg.startswith("effect ")
or msg.startswith("e ")
):
msg = msg.split(" ") msg = msg.split(" ")
if len(msg) < 2 or msg[1] == "": if len(msg) < 2 or msg[1] == "":
print("effect [effect namespace]") print("effect [effect namespace]")
else: else:
print(f"next message will be sent with [{msg[1]}]") print(f"next message will be sent with [{msg[1]}]")
current_effect = msg[1] current_effect = msg[1]
elif msg == 'filter' or msg == 'f' or msg.startswith('filter ') or msg.startswith('f '): elif (
msg == "filter"
or msg == "f"
or msg.startswith("filter ")
or msg.startswith("f ")
):
# Set the curernt chat # Set the curernt chat
msg = msg.split(' ') msg = msg.split(" ")
if len(msg) < 2 or msg[1] == '': if len(msg) < 2 or msg[1] == "":
print('filter [recipients]') print("filter [recipients]")
else: else:
print(f'Filtering to {[fixup_handle(h) for h in msg[1:]]}') print(f"Filtering to {[fixup_handle(h) for h in msg[1:]]}")
current_participants = [fixup_handle(h) for h in msg[1:]] current_participants = [fixup_handle(h) for h in msg[1:]]
im._cache_keys(current_participants, "com.apple.madrid") # Just to make things faster, and to make it error on invalid addresses im._cache_keys(
elif msg == 'handle' or msg.startswith('handle '): current_participants, "com.apple.madrid"
msg = msg.split(' ') ) # Just to make things faster, and to make it error on invalid addresses
if len(msg) < 2 or msg[1] == '': elif msg == "handle" or msg.startswith("handle "):
print('handle [handle]') msg = msg.split(" ")
print('Available handles:') if len(msg) < 2 or msg[1] == "":
print("handle [handle]")
print("Available handles:")
for h in user.handles: for h in user.handles:
if h == user.current_handle: if h == user.current_handle:
print(f'\t{h} (current)') print(f"\t{h} (current)")
else: else:
print(f'\t{h}') print(f"\t{h}")
else: else:
h = msg[1] h = msg[1]
h = fixup_handle(h) h = fixup_handle(h)
if h in user.handles: if h in user.handles:
print(f'Using {h} as handle') print(f"Using {h} as handle")
user.current_handle = h user.current_handle = h
else: else:
print(f'Handle {h} not found') print(f"Handle {h} not found")
elif current_participants != []: elif current_participants != []:
if msg.startswith('\\'): if msg.startswith("\\"):
msg = msg[1:] msg = msg[1:]
im.send(imessage.OldiMessage( im.send(
text=msg, imessage.OldiMessage(
participants=current_participants, text=msg,
sender=user.current_handle, participants=current_participants,
effect=current_effect sender=user.current_handle,
)) effect=current_effect,
)
)
current_effect = None current_effect = None
else: else:
print('No chat selected, use help for help') print("No chat selected, use help for help")
time.sleep(0.1) time.sleep(0.1)
# elif msg.startswith('send') or msg.startswith('s'): # elif msg.startswith('send') or msg.startswith('s'):
# msg = msg.split(' ') # msg = msg.split(' ')
# if len(msg) < 3: # if len(msg) < 3:
# print('send [recipient] [message]') # print('send [recipient] [message]')
# else: # else:
# im.send(imessage.iMessage( # im.send(imessage.iMessage(
# text=' '.join(msg[2:]), # text=' '.join(msg[2:]),
# participants=[msg[1], user.handles[0]], # participants=[msg[1], user.handles[0]],
# #sender=user.handles[0] # #sender=user.handles[0]
# )) # ))