pypush-plus-plus/pypush/__main__.py
2023-11-19 14:56:23 -07:00

200 lines
7.1 KiB
Python

import json
import logging
from base64 import b64decode, b64encode
from subprocess import PIPE, Popen
from rich.logging import RichHandler
import apns
import ids
import imessage
import trio
logging.basicConfig(
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.DEBUG)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.DEBUG)
logging.captureWarnings(True)
process = Popen(["git", "rev-parse", "HEAD"], stdout=PIPE) # type: ignore
(commit_hash, err) = process.communicate()
exit_code = process.wait()
commit_hash = commit_hash.decode().strip()
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
# Re-register if the commit hash has changed
if CONFIG.get("commit_hash") != commit_hash:
logging.warning("pypush commit is different, forcing re-registration...")
CONFIG["commit_hash"] = commit_hash
if "id" in CONFIG:
del CONFIG["id"]
def safe_b64decode(s):
try:
return b64decode(s)
except:
return None
async def main():
token = CONFIG.get("push", {}).get("token")
if token is not None:
token = b64decode(token)
else:
token = b""
push_creds = apns.PushCredentials(
CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token)
async with apns.APNSConnection.start(push_creds) as conn:
await conn.set_state(1)
await conn.filter(["com.apple.madrid"])
user = ids.IDSUser(conn)
user.auth_and_set_encryption_from_config(CONFIG)
# Write config.json
CONFIG["encryption"] = {
"rsa_key": user.encryption_identity.encryption_key,
"ec_key": user.encryption_identity.signing_key,
}
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.credentials.token).decode(),
"key": user.push_connection.credentials.private_key,
"cert": user.push_connection.credentials.cert,
}
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)
im = imessage.iMessageUser(conn, user)
# Send a message to myself
async with trio.open_nursery() as nursery:
nursery.start_soon(input_task, im)
nursery.start_soon(output_task, im)
async def input_task(im: imessage.iMessageUser):
current_effect: str | None = None
current_participants: list[str] = []
def is_cmd(cmd_str: str, name: str) -> bool:
return cmd_str in [name, name[0]] or cmd_str.startswith(f"{name} ") or cmd_str.startswith(f"{name[0]} ")
def get_parameters(cmd: str, err_msg: str) -> list[str] | None:
sections: list[str] = cmd.split(" ")
if len(sections) < 2 or len(sections[1]) == 0:
print(err_msg)
return None
else:
return sections[1:]
def fixup_handle(handle: str) -> str:
if handle.startswith("tel:+") or handle.startswith("mailto:"):
return handle
elif handle.startswith("tel:"):
return "tel:+" + handle[4:]
elif handle.startswith("+"):
return "tel:" + handle
elif handle[0].isdigit():
# if the handle is 10 digits, assume it's a US number
if len(handle) == 10:
return "tel:+1" + handle
# otherwise just assume it's a full phone number
return "tel:+" + handle
else: # assume it's an email
return "mailto:" + handle
while True:
if (cmd := await trio.to_thread.run_sync(input, ">> ", cancellable=True)) == "":
continue
if is_cmd(cmd, "help"):
print("help (h): show this message")
print("quit (q): quit")
print("filter (f) [recipient]: set the current chat")
print("note: recipient must start with tel: or mailto: and include the country code")
print("effect (e): adds an iMessage effect to the next sent message")
print("handle [handle]: set the current handle (for sending messages with)")
print("\\: escape commands (will be removed from message)")
print("all other commands will be treated as message text to be sent")
elif is_cmd(cmd, "quit"):
exit(0)
elif is_cmd(cmd, "effect"):
if (effect := get_parameters(cmd, "effect [effect namespace]")) is not None:
print(f"next message will be sent with [{effect[0]}]")
current_effect = effect[0]
elif is_cmd(cmd, "filter"):
# set the current chat
if (participants := get_parameters(cmd, "filter [recipients]")) is not None:
fixed_participants: list[str] = list(map(fixup_handle, participants))
print(f"Filtering to {fixed_participants}")
current_participants = fixed_participants
elif is_cmd(cmd, "handle"):
handles: list[str] = im.user.handles
av_handles: str = "\n".join([f"\t{h}{' (current)' if h == im.user.current_handle else ''}" for h in handles])
err_str: str = f"handle [handle]\nAvailable handles:\n{av_handles}"
if (input_handles := get_parameters(cmd, err_str)) is not None:
handle = fixup_handle(input_handles[0])
if handle in handles:
print(f"Using {handle} as handle")
im.user.current_handle = handle
else:
print(f"Handle {handle} not found")
elif is_cmd(cmd, "typing"):
if len(current_participants) > 0:
await im.typing(current_participants)
else:
print("No chat selected")
elif is_cmd(cmd, "typingoff"):
if len(current_participants) > 0:
await im.typing(current_participants, False)
else:
print("No chat selected")
elif len(current_participants) > 0:
if cmd.startswith("\\"):
cmd = cmd[1:]
await im.send(imessage.iMessage.create(im, cmd, current_participants, current_effect))
current_effect = None
else:
print("No chat selected")
async def output_task(im: imessage.iMessageUser):
while True:
msg = await im.receive()
print(str(msg))
if __name__ == "__main__":
trio.run(main)