diff --git a/demo.py b/demo.py index 77dbab1..96bcf0c 100644 --- a/demo.py +++ b/demo.py @@ -119,107 +119,114 @@ CONFIG["push"] = { with open("config.json", "w") as f: json.dump(CONFIG, f, indent=4) -user_rsa_key = ids._helpers.parse_key(user.encryption_identity.encryption_key) -NORMAL_NONCE = b"\x00" * 15 + b"\x01" - -def decrypt(payload, sender_token, rsa_key: rsa.RSAPrivateKey = user_rsa_key): - """ - iMessage payload format: - 0x00 - ? - 0x01-0x02 - length of payload - 0x03-0xA0 - RSA encrypted payload portion - 0x00-0x0F - AES key - 0x0F-0x?? - AES encrypted payload portion - 0xA1-0xlength of payload+3 - AES encrypted payload portion - 0xlength of payload+3 - length of signature - 0xLEN+4-0xLEN+4+length of signature - signature - """ - from io import BytesIO - - - payload = BytesIO(payload) - tag = payload.read(1) - length = int.from_bytes(payload.read(2), "big") - body = payload.read(length) - body_io = BytesIO(body) - rsa_body = rsa_key.decrypt( - body_io.read(160), - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA1()), - algorithm=hashes.SHA1(), - label=None, - ), - ) - - cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) - decrypted = cipher.decryptor().update(rsa_body[16:] + body_io.read()) - - # Try to gzip decompress the payload - try: - decrypted = gzip.decompress(decrypted) - except: - logging.debug("Failed to decompress payload") - pass - - decrypted = plistlib.loads(decrypted) - - signature_len = payload.read(1)[0] - signature = payload.read(signature_len) - #logging.info(f"Signature: {signature}") - #logging.info(f"Decrypted: {decrypted}") - - # Verify the signature - sender = decrypted["p"][-1] - # Lookup the public key for the sender - lookup = user.lookup([sender])[sender] - #logging.debug(f"Lookup: {lookup}") - sender = None - for identity in lookup['identities']: - if identity['push-token'] == sender_token: - sender = identity - break - - if sender is None: - logging.error(f"Failed to find identity for {sender_token}") - - identity_keys = sender['client-data']['public-message-identity-key'] - identity_keys = ids.identity.IDSIdentity.decode(identity_keys) - - sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) - - from cryptography.hazmat.primitives.asymmetric import ec - #logging.debug(f"Verifying signature {signature} with key {sender_ec_key.public_numbers()} and data {body}") - # Verify the signature (will throw an exception if it fails) - sender_ec_key.verify( - signature, - body, - ec.ECDSA(hashes.SHA1()), - ) - - return decrypted - +import imessage +im = imessage.iMessageUser(conn, user) while True: + msg = im.receive() + print(f"Got message {msg['t']}") + +# user_rsa_key = ids._helpers.parse_key(user.encryption_identity.encryption_key) +# NORMAL_NONCE = b"\x00" * 15 + b"\x01" - def check_response(x): - if x[0] != 0x0A: - return False - resp_body = apns._get_field(x[1], 3) - if resp_body is None: - return False - resp_body = plistlib.loads(resp_body) - if "P" not in resp_body: - return False - return True +# def decrypt(payload, sender_token, rsa_key: rsa.RSAPrivateKey = user_rsa_key): +# """ +# iMessage payload format: +# 0x00 - ? +# 0x01-0x02 - length of payload +# 0x03-0xA0 - RSA encrypted payload portion +# 0x00-0x0F - AES key +# 0x0F-0x?? - AES encrypted payload portion +# 0xA1-0xlength of payload+3 - AES encrypted payload portion +# 0xlength of payload+3 - length of signature +# 0xLEN+4-0xLEN+4+length of signature - signature +# """ +# from io import BytesIO - payload = conn.incoming_queue.wait_pop_find(check_response) - resp_body = apns._get_field(payload[1], 3) - id = apns._get_field(payload[1], 4) - conn._send_ack(id) - resp_body = plistlib.loads(resp_body) - # logging.info(f"Got response: {resp_body}") - logging.debug(f"Got message: {resp_body}") - token = resp_body['t'] - payload = resp_body["P"] - payload = decrypt(payload, token) - logging.info(f"Got message: {payload['t']} from {payload['p'][1]}") + +# payload = BytesIO(payload) +# tag = payload.read(1) +# length = int.from_bytes(payload.read(2), "big") +# body = payload.read(length) +# body_io = BytesIO(body) +# rsa_body = rsa_key.decrypt( +# body_io.read(160), +# padding.OAEP( +# mgf=padding.MGF1(algorithm=hashes.SHA1()), +# algorithm=hashes.SHA1(), +# label=None, +# ), +# ) + +# cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) +# decrypted = cipher.decryptor().update(rsa_body[16:] + body_io.read()) + +# # Try to gzip decompress the payload +# try: +# decrypted = gzip.decompress(decrypted) +# except: +# logging.debug("Failed to decompress payload") +# pass + +# decrypted = plistlib.loads(decrypted) + +# signature_len = payload.read(1)[0] +# signature = payload.read(signature_len) +# #logging.info(f"Signature: {signature}") +# #logging.info(f"Decrypted: {decrypted}") + +# # Verify the signature +# sender = decrypted["p"][-1] +# # Lookup the public key for the sender +# lookup = user.lookup([sender])[sender] +# #logging.debug(f"Lookup: {lookup}") +# sender = None +# for identity in lookup['identities']: +# if identity['push-token'] == sender_token: +# sender = identity +# break + +# if sender is None: +# logging.error(f"Failed to find identity for {sender_token}") + +# identity_keys = sender['client-data']['public-message-identity-key'] +# identity_keys = ids.identity.IDSIdentity.decode(identity_keys) + +# sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) + +# from cryptography.hazmat.primitives.asymmetric import ec +# #logging.debug(f"Verifying signature {signature} with key {sender_ec_key.public_numbers()} and data {body}") +# # Verify the signature (will throw an exception if it fails) +# sender_ec_key.verify( +# signature, +# body, +# ec.ECDSA(hashes.SHA1()), +# ) + +# return decrypted + + +# while True: + +# def check_response(x): +# if x[0] != 0x0A: +# return False +# resp_body = apns._get_field(x[1], 3) +# if resp_body is None: +# return False +# resp_body = plistlib.loads(resp_body) +# if "P" not in resp_body: +# return False +# return True + +# payload = conn.incoming_queue.wait_pop_find(check_response) +# resp_body = apns._get_field(payload[1], 3) +# id = apns._get_field(payload[1], 4) +# conn._send_ack(id) +# resp_body = plistlib.loads(resp_body) +# # logging.info(f"Got response: {resp_body}") +# logging.debug(f"Got message: {resp_body}") +# token = resp_body['t'] +# payload = resp_body["P"] +# payload = decrypt(payload, token) +# logging.info(f"Got message: {payload['t']} from {payload['p'][1]}") diff --git a/imessage.py b/imessage.py index e82c5d7..99e04ad 100644 --- a/imessage.py +++ b/imessage.py @@ -8,30 +8,124 @@ import apns import ids -class iMessageUser: - def __init__(self, apns: apns.APNSConnection, ids: ids.IDSUser, encryption_key: str, signing_key: str): - self.apns = apns - self.ids = ids - self.encryption_key = encryption_key - self.signing_key = signing_key +import plistlib +from io import BytesIO - def _get_raw_messages(self) -> list[dict]: - pass +from cryptography.hazmat.primitives.asymmetric import ec, padding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +import gzip + +NORMAL_NONCE = b"\x00" * 15 + b"\x01" + +class iMessageUser: + + def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser): + self.connection = connection + self.user = user + + def _get_raw_message(self): + """ + Returns a raw APNs message corresponding to the next conforming notification in the queue + """ + def check_response(x): + if x[0] != 0x0A: + return False + resp_body = apns._get_field(x[1], 3) + if resp_body is None: + return False + resp_body = plistlib.loads(resp_body) + if "P" not in resp_body: + return False + return True + + payload = self.connection.incoming_queue.wait_pop_find(check_response) + id = apns._get_field(payload[1], 4) + self.connection._send_ack(id) + + return payload def _send_raw_message(self, message: dict): pass - def _decrypt_message(self, message: dict) -> dict: - pass - def _encrypt_message(self, message: dict) -> dict: pass def _sign_message(self, message: dict) -> dict: pass - def _verify_message(self, message: dict) -> dict: - pass + def _parse_payload(payload: bytes) -> tuple[bytes, bytes]: + payload = BytesIO(payload) - def get_messages(self) -> list[dict]: - pass + tag = payload.read(1) + body_length = int.from_bytes(payload.read(2), "big") + body = payload.read(body_length) + + signature_len = payload.read(1)[0] + signature = payload.read(signature_len) + + return (body, signature) + + def _decrypt_payload(self, payload: bytes) -> dict: + payload = iMessageUser._parse_payload(payload) + + body = BytesIO(payload[0]) + rsa_body = ids._helpers.parse_key(self.user.encryption_identity.encryption_key).decrypt( + body.read(160), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None, + ), + ) + + cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) + decrypted = cipher.decryptor().update(rsa_body[16:] + body.read()) + + # Try to gzip decompress the payload + try: + decrypted = gzip.decompress(decrypted) + except: + pass + + return plistlib.loads(decrypted) + + def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool: + # Get the public key for the sender + lookup = self.user.lookup([sender])[sender] + + sender_iden = None + for identity in lookup['identities']: + if identity['push-token'] == sender_token: + sender_iden = identity + break + + identity_keys = sender_iden['client-data']['public-message-identity-key'] + identity_keys = ids.identity.IDSIdentity.decode(identity_keys) + + sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) + + + payload = iMessageUser._parse_payload(payload) + + try: + # Verify the signature (will throw an exception if it fails) + sender_ec_key.verify( + payload[1], + payload[0], + ec.ECDSA(hashes.SHA1()), + ) + return True + except: + return False + + def receive(self) -> dict: + raw = self._get_raw_message() + body = apns._get_field(raw[1], 3) + body = plistlib.loads(body) + payload = body["P"] + decrypted = self._decrypt_payload(payload) + if not self._verify_payload(payload, decrypted["p"][-1], body["t"]): + raise Exception("Failed to verify payload") + return decrypted