import gzip import json import logging import plistlib import threading import time from base64 import b64decode, b64encode from getpass import getpass from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.serialization import load_pem_private_key from rich.logging import RichHandler import apns import ids logging.basicConfig( level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()] ) # Set sane log levels logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("jelly").setLevel(logging.INFO) logging.getLogger("nac").setLevel(logging.INFO) logging.getLogger("apns").setLevel(logging.INFO) logging.getLogger("albert").setLevel(logging.INFO) logging.getLogger("ids").setLevel(logging.DEBUG) logging.getLogger("bags").setLevel(logging.DEBUG) # Try and load config.json try: with open("config.json", "r") as f: CONFIG = json.load(f) except FileNotFoundError: CONFIG = {} conn = apns.APNSConnection( CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") ) def safe_b64decode(s): try: return b64decode(s) except: return None conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token"))) conn.set_state(1) conn.filter(["com.apple.madrid"]) user = ids.IDSUser(conn) if CONFIG.get("auth", {}).get("cert") is not None: auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) user_id = CONFIG["auth"]["user_id"] handles = CONFIG["auth"]["handles"] user.restore_authentication(auth_keypair, user_id, handles) else: username = input("Username: ") password = getpass("Password: ") user.authenticate(username, password) user.encryption_identity = ids.identity.IDSIdentity(encryption_key=CONFIG.get("encryption", {}).get("rsa_key"), signing_key=CONFIG.get("encryption", {}).get("ec_key")) if ( CONFIG.get("id", {}).get("cert") is not None and user.encryption_identity is not None ): id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) user.restore_identity(id_keypair) else: logging.info("Registering new identity...") import emulated.nac vd = emulated.nac.generate_validation_data() vd = b64encode(vd).decode() user.register(vd) logging.info("Waiting for incoming messages...") # Create a thread to send keepalive messages def keepalive(): while True: time.sleep(5) conn.keep_alive() threading.Thread(target=keepalive, daemon=True).start() # Write config.json CONFIG["encryption"] = { "rsa_key": user.encryption_identity.encryption_key, "ec_key": user.encryption_identity.signing_key, } CONFIG["id"] = { "key": user._id_keypair.key, "cert": user._id_keypair.cert, } CONFIG["auth"] = { "key": user._auth_keypair.key, "cert": user._auth_keypair.cert, "user_id": user.user_id, "handles": user.handles, } CONFIG["push"] = { "token": b64encode(user.push_connection.token).decode(), "key": user.push_connection.private_key, "cert": user.push_connection.cert, } with open("config.json", "w") as f: json.dump(CONFIG, f, indent=4) user_rsa_key = ids._helpers.parse_key(user.encryption_identity.encryption_key) NORMAL_NONCE = b"\x00" * 15 + b"\x01" def decrypt(payload, sender_token, rsa_key: rsa.RSAPrivateKey = user_rsa_key): """ iMessage payload format: 0x00 - ? 0x01-0x02 - length of payload 0x03-0xA0 - RSA encrypted payload portion 0x00-0x0F - AES key 0x0F-0x?? - AES encrypted payload portion 0xA1-0xlength of payload+3 - AES encrypted payload portion 0xlength of payload+3 - length of signature 0xLEN+4-0xLEN+4+length of signature - signature """ from io import BytesIO payload = BytesIO(payload) tag = payload.read(1) length = int.from_bytes(payload.read(2), "big") body = payload.read(length) body_io = BytesIO(body) rsa_body = rsa_key.decrypt( body_io.read(160), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None, ), ) cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) decrypted = cipher.decryptor().update(rsa_body[16:] + body_io.read()) # Try to gzip decompress the payload try: decrypted = gzip.decompress(decrypted) except: logging.debug("Failed to decompress payload") pass decrypted = plistlib.loads(decrypted) signature_len = payload.read(1)[0] signature = payload.read(signature_len) #logging.info(f"Signature: {signature}") #logging.info(f"Decrypted: {decrypted}") # Verify the signature sender = decrypted["p"][-1] # Lookup the public key for the sender lookup = user.lookup([sender])[sender] #logging.debug(f"Lookup: {lookup}") sender = None for identity in lookup['identities']: if identity['push-token'] == sender_token: sender = identity break if sender is None: logging.error(f"Failed to find identity for {sender_token}") identity_keys = sender['client-data']['public-message-identity-key'] identity_keys = ids.identity.IDSIdentity.decode(identity_keys) sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) from cryptography.hazmat.primitives.asymmetric import ec #logging.debug(f"Verifying signature {signature} with key {sender_ec_key.public_numbers()} and data {body}") # Verify the signature (will throw an exception if it fails) sender_ec_key.verify( signature, body, ec.ECDSA(hashes.SHA1()), ) return decrypted while True: def check_response(x): if x[0] != 0x0A: return False resp_body = apns._get_field(x[1], 3) if resp_body is None: return False resp_body = plistlib.loads(resp_body) if "P" not in resp_body: return False return True payload = conn.incoming_queue.wait_pop_find(check_response) resp_body = apns._get_field(payload[1], 3) id = apns._get_field(payload[1], 4) conn._send_ack(id) resp_body = plistlib.loads(resp_body) # logging.info(f"Got response: {resp_body}") logging.debug(f"Got message: {resp_body}") token = resp_body['t'] payload = resp_body["P"] payload = decrypt(payload, token) logging.info(f"Got message: {payload['t']} from {payload['p'][1]}")