pypush-plus-plus/demo.py
2023-08-15 16:28:02 -04:00

272 lines
8.4 KiB
Python

import json
import logging
import os
import threading
import time
from base64 import b64decode, b64encode
from getpass import getpass
from subprocess import PIPE, Popen
from rich.logging import RichHandler
import apns
import ids
import imessage
logging.basicConfig(
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.INFO)
logging.captureWarnings(True)
process = Popen(["git", "rev-parse", "HEAD"], stdout=PIPE)
(commit_hash, err) = process.communicate()
exit_code = process.wait()
commit_hash = commit_hash.decode().strip()
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
# Re-register if the commit hash has changed
if CONFIG.get("commit_hash") != commit_hash:
logging.warning("pypush commit is different, forcing re-registration...")
CONFIG["commit_hash"] = commit_hash
if "id" in CONFIG:
del CONFIG["id"]
conn = apns.APNSConnection(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
)
def safe_b64decode(s):
try:
return b64decode(s)
except:
return None
conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token")))
conn.set_state(1)
conn.filter(["com.apple.madrid", "com.apple.private.alloy.sms"])
user = ids.IDSUser(conn)
if CONFIG.get("auth", {}).get("cert") is not None:
auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
user_id = CONFIG["auth"]["user_id"]
handles = CONFIG["auth"]["handles"]
user.restore_authentication(auth_keypair, user_id, handles)
else:
username = input("Username: ")
password = getpass("Password: ")
user.authenticate(username, password)
user.encryption_identity = ids.identity.IDSIdentity(
encryption_key=CONFIG.get("encryption", {}).get("rsa_key"),
signing_key=CONFIG.get("encryption", {}).get("ec_key"),
)
if (
CONFIG.get("id", {}).get("cert") is not None
and user.encryption_identity is not None
):
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair)
else:
logging.info("Registering new identity...")
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
user.register(vd)
logging.info("Waiting for incoming messages...")
# Write config.json
CONFIG["encryption"] = {
"rsa_key": user.encryption_identity.encryption_key,
"ec_key": user.encryption_identity.signing_key,
}
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.token).decode(),
"key": user.push_connection.private_key,
"cert": user.push_connection.cert,
}
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)
im = imessage.iMessageUser(conn, user)
INPUT_QUEUE = apns.IncomingQueue()
def input_thread():
from prompt_toolkit import prompt
while True:
try:
msg = prompt(">> ")
except:
msg = "quit"
INPUT_QUEUE.append(msg)
threading.Thread(target=input_thread, daemon=True).start()
print("Type 'help' for help")
def fixup_handle(handle):
if handle.startswith("tel:+"):
return handle
elif handle.startswith("mailto:"):
return handle
elif handle.startswith("tel:"):
return "tel:+" + handle[4:]
elif handle.startswith("+"):
return "tel:" + handle
# If the handle starts with a number
elif handle[0].isdigit():
# If the handle is 10 digits, assume it's a US number
if len(handle) == 10:
return "tel:+1" + handle
# If the handle is 11 digits, assume it's a US number with country code
elif len(handle) == 11:
return "tel:+" + handle
else: # Assume it's an email
return "mailto:" + handle
current_participants = []
current_effect = None
while True:
im.activate_sms() # We must call this always since SMS could be turned off and on again, and it might have been on before this.
msg = im.receive()
if msg is not None:
# print(f'[{msg.sender}] {msg.text}')
print(str(msg))
# attachments = msg.attachments()
# if len(attachments) > 0:
# attachments_path = f"attachments/{msg.id}/"
# os.makedirs(attachments_path, exist_ok=True)
# for attachment in attachments:
# with open(attachments_path + attachment.name, "wb") as attachment_file:
# attachment_file.write(attachment.versions[0].data())
# print(f"({len(attachments)} attachment{'s have' if len(attachments) != 1 else ' has'} been downloaded and put "
# f"in {attachments_path})")
if len(INPUT_QUEUE) > 0:
msg = INPUT_QUEUE.pop()
if msg == "":
continue
if msg == "help" or msg == "h":
print("help (h): show this message")
print("quit (q): quit")
# print('send (s) [recipient] [message]: send a message')
print("filter (f) [recipient]: set the current chat")
print("effect (e): adds an iMessage effect to the next sent message")
print(
"note: recipient must start with tel: or mailto: and include the country code"
)
print("handle <handle>: set the current handle (for sending messages)")
print("\\: escape commands (will be removed from message)")
elif msg == "quit" or msg == "q":
break
elif (
msg == "effect"
or msg == "e"
or msg.startswith("effect ")
or msg.startswith("e ")
):
msg = msg.split(" ")
if len(msg) < 2 or msg[1] == "":
print("effect [effect namespace]")
else:
print(f"next message will be sent with [{msg[1]}]")
current_effect = msg[1]
elif (
msg == "filter"
or msg == "f"
or msg.startswith("filter ")
or msg.startswith("f ")
):
# Set the curernt chat
msg = msg.split(" ")
if len(msg) < 2 or msg[1] == "":
print("filter [recipients]")
else:
print(f"Filtering to {[fixup_handle(h) for h in msg[1:]]}")
current_participants = [fixup_handle(h) for h in msg[1:]]
im._cache_keys(
current_participants, "com.apple.madrid"
) # Just to make things faster, and to make it error on invalid addresses
elif msg == "handle" or msg.startswith("handle "):
msg = msg.split(" ")
if len(msg) < 2 or msg[1] == "":
print("handle [handle]")
print("Available handles:")
for h in user.handles:
if h == user.current_handle:
print(f"\t{h} (current)")
else:
print(f"\t{h}")
else:
h = msg[1]
h = fixup_handle(h)
if h in user.handles:
print(f"Using {h} as handle")
user.current_handle = h
else:
print(f"Handle {h} not found")
elif current_participants != []:
if msg.startswith("\\"):
msg = msg[1:]
imsg = imessage.iMessage.create(
im,
msg,
current_participants,
)
imsg.effect = current_effect
im.send(imsg)
current_effect = None
else:
print("No chat selected, use help for help")
time.sleep(0.1)