diff --git a/demo.py b/demo.py index aa73cf5..4238d27 100644 --- a/demo.py +++ b/demo.py @@ -69,7 +69,11 @@ else: user.ec_key = CONFIG.get("encryption", {}).get("ec_key") user.rsa_key = CONFIG.get("encryption", {}).get("rsa_key") -if CONFIG.get("id", {}).get("cert") is not None and user.ec_key is not None and user.rsa_key is not None: +if ( + CONFIG.get("id", {}).get("cert") is not None + and user.ec_key is not None + and user.rsa_key is not None +): id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) user.restore_identity(id_keypair) else: @@ -120,15 +124,30 @@ with open("config.json", "w") as f: user_rsa_key = load_pem_private_key(user.rsa_key.encode(), password=None) -def decrypt(payload): - # print(payload[1:3]) - length = int.from_bytes(payload[1:3], "big") - # print("Length", length) - payload = payload[3 : length + 3] - # print("Decrypting payload", payload) +NORMAL_NONCE = b"\x00" * 15 + b"\x01" - decrypted1 = user_rsa_key.decrypt( - payload[:160], +def decrypt(payload, sender_token, rsa_key: rsa.RSAPrivateKey = user_rsa_key): + """ + iMessage payload format: + 0x00 - ? + 0x01-0x02 - length of payload + 0x03-0xA0 - RSA encrypted payload portion + 0x00-0x0F - AES key + 0x0F-0x?? - AES encrypted payload portion + 0xA1-0xlength of payload+3 - AES encrypted payload portion + 0xlength of payload+3 - length of signature + 0xLEN+4-0xLEN+4+length of signature - signature + """ + from io import BytesIO + + + payload = BytesIO(payload) + tag = payload.read(1) + length = int.from_bytes(payload.read(2), "big") + body = payload.read(length) + body_io = BytesIO(body) + rsa_body = rsa_key.decrypt( + body_io.read(160), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), @@ -136,14 +155,52 @@ def decrypt(payload): ), ) - cipher = Cipher(algorithms.AES(decrypted1[:16]), modes.CTR(b"\x00" * 15 + b"\x01")) - decryptor = cipher.decryptor() - pt = decryptor.update(decrypted1[16:] + payload[160:]) - # print(pt) - pt = gzip.decompress(pt) - payload = plistlib.loads(pt) - # logging.debug(f"Got payload: {payload}") - return payload + cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) + decrypted = cipher.decryptor().update(rsa_body[16:] + body_io.read()) + + # Try to gzip decompress the payload + try: + decrypted = gzip.decompress(decrypted) + except: + logging.debug("Failed to decompress payload") + pass + + decrypted = plistlib.loads(decrypted) + + signature_len = payload.read(1)[0] + signature = payload.read(signature_len) + logging.info(f"Signature: {signature}") + logging.info(f"Decrypted: {decrypted}") + + # Verify the signature + sender = decrypted["p"][-1] + # Lookup the public key for the sender + lookup = user.lookup([sender])[sender] + logging.debug(f"Lookup: {lookup}") + sender = None + for identity in lookup['identities']: + if identity['push-token'] == sender_token: + sender = identity + break + + if sender is None: + logging.error(f"Failed to find identity for {sender_token}") + + identity_keys = sender['client-data']['public-message-identity-key'] + identity_keys = IdentityKeys.decode(identity_keys) + + sender_ec_key = identity_keys.ecdsa_key + + from cryptography.hazmat.primitives.asymmetric import ec + #logging.debug(f"Verifying signature {signature} with key {sender_ec_key.public_numbers()} and data {body}") + # Verify the signature (will throw an exception if it fails) + sender_ec_key.verify( + signature, + body, + ec.ECDSA(hashes.SHA1()), + ) + + return decrypted while True: @@ -165,6 +222,8 @@ while True: conn._send_ack(id) resp_body = plistlib.loads(resp_body) # logging.info(f"Got response: {resp_body}") + logging.debug(f"Got response: {resp_body}") + token = resp_body['t'] payload = resp_body["P"] - payload = decrypt(payload) + payload = decrypt(payload, token) logging.info(f"Got message: {payload['t']} from {payload['p'][1]}")