pypush-plus-plus/demo.py

324 lines
13 KiB
Python
Raw Normal View History

2023-05-02 12:39:11 +00:00
import json
2023-07-26 11:50:48 +00:00
import logging
2023-07-31 23:38:28 +00:00
import os
2023-07-26 11:50:48 +00:00
import threading
import time
from base64 import b64decode, b64encode
2023-05-09 21:03:27 +00:00
from getpass import getpass
from cryptography import x509
2023-05-02 12:39:11 +00:00
2023-07-23 22:55:13 +00:00
from rich.logging import RichHandler
2023-07-26 11:50:48 +00:00
import apns
import ids
2023-07-28 15:53:13 +00:00
import imessage
2023-07-26 11:50:48 +00:00
2023-08-19 15:27:44 +00:00
import trio
2023-08-27 13:27:24 +00:00
import argparse
2023-08-19 15:27:44 +00:00
from exceptions import *
2023-07-23 22:55:13 +00:00
logging.basicConfig(
2023-07-26 22:49:41 +00:00
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
2023-07-23 22:55:13 +00:00
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
2023-08-15 13:05:08 +00:00
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
2023-07-31 17:25:19 +00:00
logging.getLogger("asyncio").setLevel(logging.WARNING)
2023-07-23 22:55:13 +00:00
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
2023-08-18 02:05:18 +00:00
logging.getLogger("apns").setLevel(logging.INFO)
2023-07-24 13:18:21 +00:00
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
2023-07-31 17:08:57 +00:00
logging.getLogger("bags").setLevel(logging.INFO)
2023-08-18 02:05:18 +00:00
logging.getLogger("imessage").setLevel(logging.INFO)
2023-05-03 00:53:18 +00:00
logging.captureWarnings(True)
2023-05-09 21:03:27 +00:00
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
2023-05-02 12:39:11 +00:00
2023-05-09 22:48:44 +00:00
def safe_b64decode(s):
try:
return b64decode(s)
except:
return None
2023-08-26 02:26:37 +00:00
def safe_config():
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)
2023-07-26 11:50:48 +00:00
def get_not_valid_after_timestamp(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8'))
return cert.not_valid_after
except Exception as e:
return None # Return None in case of an error
2023-08-27 13:27:24 +00:00
async def main(args: argparse.Namespace):
2023-08-26 02:26:37 +00:00
# Load any existing push credentials
token = CONFIG.get("push", {}).get("token")
2023-08-26 02:26:37 +00:00
token = b64decode(token) if token is not None else b""
2023-08-18 00:23:56 +00:00
push_creds = apns.PushCredentials(
CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token)
2023-08-28 12:01:41 +00:00
def register(conn, users):
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
users = ids.register(conn, users, vd, args.client_data or args.reg_notify)
2023-08-28 12:01:41 +00:00
return users
2023-08-18 00:23:56 +00:00
async with apns.APNSConnection.start(push_creds) as conn:
2023-08-26 02:26:37 +00:00
# Save the push credentials to the config
CONFIG["push"] = {
"token": b64encode(conn.credentials.token).decode(),
"key": conn.credentials.private_key,
"cert": conn.credentials.cert,
}
safe_config()
# Activate the connection
await conn.set_state(1)
await conn.filter(["com.apple.madrid"])
2023-08-18 00:23:56 +00:00
2023-08-26 02:26:37 +00:00
# If the user wants a phone number, we need to register it WITH an Apple ID, then register the Apple ID again
# otherwise we encounter issues for some reason
users = []
if "id" in CONFIG:
logging.debug("Restoring old-style identity...")
users.append(ids.IDSAppleUser(conn, CONFIG["auth"]["user_id"], ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]),
ids.identity.IDSIdentity(CONFIG["encryption"]["ec_key"], CONFIG["encryption"]["rsa_key"]), CONFIG["id"]["cert"],
CONFIG["auth"]["handles"]))
if "users" in CONFIG:
logging.debug("Restoring new-style identity...")
for user in CONFIG["users"]:
users.append(ids.IDSUser(conn, user["id"], ids._helpers.KeyPair(user["auth_key"], user["auth_cert"]),
ids.identity.IDSIdentity(user["signing_key"], user["encryption_key"]), user["id_cert"],
user["handles"]))
2023-08-24 11:31:11 +00:00
else:
2023-08-26 02:26:37 +00:00
print("Would you like to register a phone number? (y/n)")
if input("> ").lower() == "y":
2023-09-02 23:09:41 +00:00
import sms_registration
if args.phone is None:
raise GatewayConnectionError("You did not supply an IP address.")
2023-09-02 23:09:41 +00:00
2023-08-26 02:26:37 +00:00
if "phone" in CONFIG:
phone_sig = b64decode(CONFIG["phone"].get("sig"))
phone_number = CONFIG["phone"].get("number")
2023-09-02 23:09:41 +00:00
elif args.pdu is not None:
phone_number, phone_sig = sms_registration.parse_pdu(args.pdu, None)
2023-08-26 02:26:37 +00:00
else:
import sms_registration
phone_number, phone_sig = sms_registration.register(push_token=conn.credentials.token,
no_parse=args.trigger_pdu, gateway=args.gateway,
phone_ip=args.phone)
2023-08-26 02:26:37 +00:00
CONFIG["phone"] = {
"number": phone_number,
"sig": b64encode(phone_sig).decode(),
}
safe_config()
users.append(ids.IDSPhoneUser.authenticate(conn, phone_number, phone_sig))
print("Would you like sign in to your Apple ID (recommended)? (y/n)")
if input("> ").lower() == "y":
username = input("Username: ")
password = input("Password: ")
users.append(ids.IDSAppleUser.authenticate(conn, username, password))
2023-08-24 11:31:11 +00:00
2023-08-28 12:01:41 +00:00
users = register(conn, users)
2023-08-24 11:31:11 +00:00
2023-08-26 02:26:37 +00:00
CONFIG["users"] = []
for user in users:
CONFIG["users"].append({
"id": user.user_id,
"auth_key": user.auth_keypair.key,
"auth_cert": user.auth_keypair.cert,
"encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None,
"signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None,
"id_cert": user.id_cert,
"handles": user.handles,
})
safe_config()
2023-08-24 11:31:11 +00:00
2023-08-27 13:27:24 +00:00
if args.reregister:
print("Re-registering...")
register(conn, users)
2023-08-27 13:27:24 +00:00
CONFIG["users"] = []
for user in users:
CONFIG["users"].append({
"id": user.user_id,
"auth_key": user.auth_keypair.key,
"auth_cert": user.auth_keypair.cert,
"encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None,
"signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None,
"id_cert": user.id_cert,
"handles": user.handles,
})
if "P:" in str(user.user_id):
expiration = get_not_valid_after_timestamp(user.id_cert)
expiration = str(expiration) + " UTC"
print(f"Number registration is valid until {expiration}. (YYYY/MM/DD)")
else:
email_user = user
for n in range(len(user.handles)):
if "mailto:" in str(user.handles[n]):
email_addr = user.handles[n]
safe_config()
if args.reg_notify:
im = imessage.iMessageUser(conn, email_user)
im.current_handle = email_addr
await im.send(imessage.iMessage.create(im, "Number registration is valid until " + expiration, [email_addr]))
print("Done!")
2023-08-27 13:27:24 +00:00
2023-08-28 12:13:20 +00:00
if args.alive:
logging.getLogger("apns").setLevel(logging.DEBUG)
while True:
await trio.sleep(20)
2023-08-24 11:31:11 +00:00
2023-10-13 22:24:39 +00:00
# im = imessage.iMessageUser(conn, users[0])
# await im.send(imessage.iMessage.create(im, "Hello world!", ["tel:+16106632676", "mailto:testu3@icloud.com"]))
2023-10-08 17:47:53 +00:00
2023-10-13 22:24:39 +00:00
# while True:
# print(await im.receive())
2023-10-08 17:47:53 +00:00
2023-08-26 02:26:37 +00:00
# user = ids.IDSUser(conn)
# if CONFIG.get("auth", {}).get("cert") is not None:
# auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
# user_id = CONFIG["auth"]["user_id"]
# handles = CONFIG["auth"]["handles"]
# user.restore_authentication(auth_keypair, user_id, handles)
# else:
# username = input("Username: ")
# password = getpass("Password: ")
# user.authenticate(username, password)
# import sms_registration
# phone_sig = safe_b64decode(CONFIG.get("phone", {}).get("sig"))
# phone_number = CONFIG.get("phone", {}).get("number")
# if phone_sig is None or phone_number is None:
# print("Registering phone number...")
# phone_number, phone_sig = sms_registration.register(user.push_connection.credentials.token)
# CONFIG["phone"] = {
# "number": phone_number,
# "sig": b64encode(phone_sig).decode(),
# }
# if CONFIG.get("phone", {}).get("auth_key") is not None and CONFIG.get("phone", {}).get("auth_cert") is not None:
# phone_auth_keypair = ids._helpers.KeyPair(CONFIG["phone"]["auth_key"], CONFIG["phone"]["auth_cert"])
# else:
# phone_auth_keypair = ids.profile.get_phone_cert(phone_number, user.push_connection.credentials.token, [phone_sig])
# CONFIG["phone"]["auth_key"] = phone_auth_keypair.key
# CONFIG["phone"]["auth_cert"] = phone_auth_keypair.cert
# user.encryption_identity = ids.identity.IDSIdentity(
# encryption_key=CONFIG.get("encryption", {}).get("rsa_key"),
# signing_key=CONFIG.get("encryption", {}).get("ec_key"),
# )
# #user._auth_keypair = phone_auth_keypair
# user.handles = [f"tel:{phone_number}"]
# print(user.user_id)
# # user.user_id = f"P:{phone_number}"
# if (
# CONFIG.get("id", {}).get("cert") is not None
# and user.encryption_identity is not None
# ):
# id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
# user.restore_identity(id_keypair)
# else:
# logging.info("Registering new identity...")
# import emulated.nac
# vd = emulated.nac.generate_validation_data()
# vd = b64encode(vd).decode()
# ids.register
# user.register(vd, [("P:" + phone_number, phone_auth_keypair)])
# #user.register(vd)
# print("Handles: ", user.handles)
# # Write config.json
# CONFIG["encryption"] = {
# "rsa_key": user.encryption_identity.encryption_key,
# "ec_key": user.encryption_identity.signing_key,
# }
# CONFIG["id"] = {
# "key": user._id_keypair.key,
# "cert": user._id_keypair.cert,
# }
# CONFIG["auth"] = {
# "key": user._auth_keypair.key,
# "cert": user._auth_keypair.cert,
# "user_id": user.user_id,
# "handles": user.handles,
# }
# CONFIG["push"] = {
# "token": b64encode(user.push_connection.credentials.token).decode(),
# "key": user.push_connection.credentials.private_key,
# "cert": user.push_connection.credentials.cert,
# }
# with open("config.json", "w") as f:
# json.dump(CONFIG, f, indent=4)
# im = imessage.iMessageUser(conn, user)
# Send a message to myself
2023-08-26 02:26:37 +00:00
# async with trio.open_nursery() as nursery:
# nursery.start_soon(input_task, im)
# nursery.start_soon(output_task, im)
2023-08-19 15:27:44 +00:00
async def input_task(im: imessage.iMessageUser):
while True:
cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True)
if cmd != "":
2023-08-25 21:41:55 +00:00
await im.send(imessage.iMessage.create(im, cmd, ["tel:+16106632676"]))
2023-08-19 15:27:44 +00:00
async def output_task(im: imessage.iMessageUser):
while True:
msg = await im.receive()
print(str(msg))
if __name__ == "__main__":
2023-08-27 13:27:24 +00:00
parser = argparse.ArgumentParser()
parser.add_argument("--reregister", action="store_true", help="Force re-registration")
parser.add_argument("--reg-notify", action="store_true", help="Get iMessage after each registration")
2023-08-28 12:13:20 +00:00
parser.add_argument("--alive", action="store_true", help="Keep the connection alive")
2023-08-28 12:01:41 +00:00
parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)")
2023-09-02 23:09:41 +00:00
parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ")
# String arg to override pdu
parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP")
parser.add_argument("--phone", type=str, help="Override the phone IP")
parser.add_argument("--gateway", type=str, help="Override the gateway phone number")
2023-08-27 13:27:24 +00:00
args = parser.parse_args()
2023-09-02 23:20:46 +00:00
2023-09-02 23:22:44 +00:00
if args.pdu is not None and not args.pdu.startswith("REG-RESP"):
raise InvalidResponseError("Received invalid REG-RESP PDU from Gateway Client.")
2023-09-02 23:20:46 +00:00
2023-10-13 22:24:39 +00:00
trio.run(main, args)