From 565dfb32ecd79dd71a0fc0badb889e7246e27f28 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Mon, 31 Jul 2023 13:03:45 -0400 Subject: [PATCH] clean up the imessage class a tad --- imessage.py | 383 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 220 insertions(+), 163 deletions(-) diff --git a/imessage.py b/imessage.py index b728d27..1efd3bb 100644 --- a/imessage.py +++ b/imessage.py @@ -1,98 +1,147 @@ - # LOW LEVEL imessage function, decryption etc # Don't handle APNS etc, accept it already setup ## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc # JSON parsing of keys, don't pass around strs?? +import gzip +import logging +import plistlib +import random +import uuid +from dataclasses import dataclass, field +from hashlib import sha1, sha256 +from io import BytesIO + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + import apns import ids -import plistlib -from io import BytesIO - -from cryptography.hazmat.primitives.asymmetric import ec, padding -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - -import gzip -import uuid -import random -import time - -from hashlib import sha1, sha256 -import logging logger = logging.getLogger("imessage") -NORMAL_NONCE = b"\x00" * 15 + b"\x01" +NORMAL_NONCE = b"\x00" * 15 + b"\x01" # This is always used as the AES nonce + class BalloonBody: + """Represents the special parts of message extensions etc.""" + def __init__(self, type: str, data: bytes): self.type = type self.data = data # TODO : Register handlers based on type id + +@dataclass class iMessage: + """Represents an iMessage""" + text: str = "" + """Plain text of message, always required, may be an empty string""" xml: str | None = None - participants: list[str] = [] + """XML portion of message, may be None""" + participants: list[str] = field(default_factory=list) + """List of participants in the message, including the sender""" sender: str | None = None - id: str | None = None - group_id: str | None = None + """Sender of the message""" + _id: uuid.UUID | None = None + """ID of the message, will be randomly generated if not provided""" + group_id: uuid.UUID | None = None + """Group ID of the message, will be randomly generated if not provided""" body: BalloonBody | None = None + """BalloonBody, may be None""" _compressed: bool = True + """Internal property representing whether the message should be compressed""" _raw: dict | None = None + """Internal property representing the original raw message, may be None""" - def from_raw(message: dict) -> 'iMessage': - self = iMessage() + def sanity_check(self): + """Corrects any missing fields""" + if self._id is None: + self._id = uuid.uuid4() - self._raw = message + if self.group_id is None: + self.group_id = uuid.uuid4() - self.text = message.get('t') - self.xml = message.get('x') - self.participants = message.get('p', []) - if self.participants != []: - self.sender = self.participants[-1] - else: - self.sender = None + if self.sender is None: + if len(self.participants) > 1: + self.sender = self.participants[-1] + else: + logger.warning( + "Message has no sender, and only one participant, sanity check failed" + ) + return False - self.id = message.get('r') - self.group_id = message.get('gid') + if self.sender not in self.participants: + self.participants.append(self.sender) - if 'bid' in message: - # This is a message extension body - self.body = BalloonBody(message['bid'], message['b']) + if self.xml != None: + self._compressed = False # XML is never compressed for some reason - if 'compressed' in message: # This is a hack, not a real field - self._compressed = message['compressed'] + return True - return self + def from_raw(message: bytes) -> "iMessage": + """Create an `iMessage` from raw message bytes""" + compressed = False + try: + message = gzip.decompress(message) + compressed = True + except: + pass + + message = plistlib.loads(message) + + return iMessage( + text=message.get("t", ""), + xml=message.get("x"), + participants=message.get("p", []), + sender=message.get("p", [])[-1] if message.get("p", []) != [] else None, + _id=uuid.UUID(message.get("r")), + group_id=uuid.UUID(message.get("gid")), + body=BalloonBody(message["bid"], message["b"]) + if "bid" in message + else None, + _compressed=compressed, + _raw=message, + ) + + def to_raw(self) -> bytes: + """Convert an `iMessage` to raw message bytes""" + if not self.sanity_check(): + raise ValueError("Message failed sanity check") - def to_raw(self) -> dict: d = { "t": self.text, "x": self.xml, "p": self.participants, - "r": self.id, - "gid": self.group_id, - "compressed": self._compressed, + "r": str(self._id).upper(), + "gid": str(self.group_id).upper(), "pv": 0, - "gv": '8', - "v": '1' + "gv": "8", + "v": "1", } + # Remove keys that are None - return {k: v for k, v in d.items() if v is not None} - - def __str__(self): - if self._raw is not None: - return str(self._raw) - else: - return f"iMessage({self.text} from {self.sender})" + d = {k: v for k, v in d.items() if v is not None} + + # Serialize as a plist + d = plistlib.dumps(d, fmt=plistlib.FMT_BINARY) + + # Compression + if self._compressed: + d = gzip.compress(d, mtime=0) + + return d + class iMessageUser: + """Represents a logged in and connected iMessage user. + This abstraction should probably be reworked into IDS some time...""" def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser): self.connection = connection @@ -103,6 +152,7 @@ class iMessageUser: Returns a raw APNs message corresponding to the next conforming notification in the queue Returns None if no conforming notification is found """ + def check_response(x): if x[0] != 0x0A: return False @@ -110,14 +160,14 @@ class iMessageUser: return False resp_body = apns._get_field(x[1], 3) if resp_body is None: - #logger.debug("Rejecting madrid message with no body") + # logger.debug("Rejecting madrid message with no body") return False resp_body = plistlib.loads(resp_body) if "P" not in resp_body: - #logger.debug(f"Rejecting madrid message with no payload : {resp_body}") + # logger.debug(f"Rejecting madrid message with no payload : {resp_body}") return False return True - + payload = self.connection.incoming_queue.pop_find(check_response) if payload is None: return None @@ -132,14 +182,20 @@ class iMessageUser: print("TAG", tag) body_length = int.from_bytes(payload.read(2), "big") body = payload.read(body_length) - + signature_len = payload.read(1)[0] signature = payload.read(signature_len) return (body, signature) - + def _construct_payload(body: bytes, signature: bytes) -> bytes: - payload = b"\x02" + len(body).to_bytes(2, "big") + body + len(signature).to_bytes(1, "big") + signature + payload = ( + b"\x02" + + len(body).to_bytes(2, "big") + + body + + len(signature).to_bytes(1, "big") + + signature + ) return payload def _hash_identity(id: bytes) -> bytes: @@ -147,42 +203,50 @@ class iMessageUser: # TODO: Combine this with serialization code in ids.identity output = BytesIO() - output.write(b'\x00\x41\x04') - output.write(ids._helpers.parse_key(iden.signing_public_key).public_numbers().x.to_bytes(32, "big")) - output.write(ids._helpers.parse_key(iden.signing_public_key).public_numbers().y.to_bytes(32, "big")) + output.write(b"\x00\x41\x04") + output.write( + ids._helpers.parse_key(iden.signing_public_key) + .public_numbers() + .x.to_bytes(32, "big") + ) + output.write( + ids._helpers.parse_key(iden.signing_public_key) + .public_numbers() + .y.to_bytes(32, "big") + ) - output.write(b'\x00\xAC') - output.write(b'\x30\x81\xA9') - output.write(b'\x02\x81\xA1') - output.write(ids._helpers.parse_key(iden.encryption_public_key).public_numbers().n.to_bytes(161, "big")) - output.write(b'\x02\x03\x01\x00\x01') + output.write(b"\x00\xAC") + output.write(b"\x30\x81\xA9") + output.write(b"\x02\x81\xA1") + output.write( + ids._helpers.parse_key(iden.encryption_public_key) + .public_numbers() + .n.to_bytes(161, "big") + ) + output.write(b"\x02\x03\x01\x00\x01") return sha256(output.getvalue()).digest() - - def _encrypt_sign_payload(self, key: ids.identity.IDSIdentity, message: dict) -> bytes: - # Dump the message plist - compressed = message.get('compressed', False) - # Remove the compressed flag from the message - #if 'compressed' in message: - # del message['compressed'] - m2 = message.copy() - if 'compressed' in m2: - del m2['compressed'] - message = plistlib.dumps(m2, fmt=plistlib.FMT_BINARY) - - # Compress the message - if compressed: - message = gzip.compress(message, mtime=0) + def _encrypt_sign_payload( + self, key: ids.identity.IDSIdentity, message: bytes + ) -> bytes: # Generate a random AES key random_seed = random.randbytes(11) # Create the HMAC import hmac - hm = hmac.new(random_seed, message + b"\x02" + iMessageUser._hash_identity(self.user.encryption_identity.encode()) + iMessageUser._hash_identity(key.encode()), sha256).digest() + + hm = hmac.new( + random_seed, + message + + b"\x02" + + iMessageUser._hash_identity(self.user.encryption_identity.encode()) + + iMessageUser._hash_identity(key.encode()), + sha256, + ).digest() aes_key = random_seed + hm[:5] - #print(len(aes_key)) + # print(len(aes_key)) # Encrypt the message with the AES key cipher = Cipher(algorithms.AES(aes_key), modes.CTR(NORMAL_NONCE)) @@ -191,26 +255,30 @@ class iMessageUser: # Encrypt the AES key with the public key of the recipient recipient_key = ids._helpers.parse_key(key.encryption_public_key) rsa_body = recipient_key.encrypt( - aes_key + encrypted[:100], - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA1()), - algorithm=hashes.SHA1(), - label=None, - ), + aes_key + encrypted[:100], + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None, + ), ) # Construct the payload body = rsa_body + encrypted[100:] - sig = ids._helpers.parse_key(self.user.encryption_identity.signing_key).sign(body, ec.ECDSA(hashes.SHA1())) + sig = ids._helpers.parse_key(self.user.encryption_identity.signing_key).sign( + body, ec.ECDSA(hashes.SHA1()) + ) payload = iMessageUser._construct_payload(body, sig) return payload - + def _decrypt_payload(self, payload: bytes) -> dict: payload = iMessageUser._parse_payload(payload) body = BytesIO(payload[0]) - rsa_body = ids._helpers.parse_key(self.user.encryption_identity.encryption_key).decrypt( + rsa_body = ids._helpers.parse_key( + self.user.encryption_identity.encryption_key + ).decrypt( body.read(160), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), @@ -222,35 +290,19 @@ class iMessageUser: cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) decrypted = cipher.decryptor().update(rsa_body[16:] + body.read()) - # Try to gzip decompress the payload - compressed = False - try: - decrypted = gzip.decompress(decrypted) - compressed = True - except: - pass - - pl = plistlib.loads(decrypted) - pl['compressed'] = compressed # This is a hack so that messages can be re-encrypted with the same compression - - return pl + return decrypted def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool: # Get the public key for the sender - lookup = self.user.lookup([sender])[sender] + self._cache_keys([sender]) - sender_iden = None - for identity in lookup['identities']: - if identity['push-token'] == sender_token: - sender_iden = identity - break - - identity_keys = sender_iden['client-data']['public-message-identity-key'] - identity_keys = ids.identity.IDSIdentity.decode(identity_keys) + if not sender_token in self.KEY_CACHE: + logger.warning("Unable to find the public key of the sender, cannot verify") + return False + identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token][0]) sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) - payload = iMessageUser._parse_payload(payload) try: @@ -275,87 +327,92 @@ class iMessageUser: body = plistlib.loads(body) print(f"Got body message {body}") payload = body["P"] + + if not self._verify_payload(payload, body['sP'], body["t"]): + raise Exception("Failed to verify payload") + decrypted = self._decrypt_payload(payload) - if "p" in decrypted: - if not self._verify_payload(payload, decrypted["p"][-1], body["t"]): - raise Exception("Failed to verify payload") - else: - logger.warning("Unable to verify, couldn't determine sender! Dropping message! (TODO work out a way to verify these anyway)") - return self.receive() # Call again to get the next message + return iMessage.from_raw(decrypted) - - KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {} # Mapping of push token : (public key, session token) - USER_CACHE: dict[str, list[bytes]] = {} # Mapping of handle : [push tokens] + + KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {} + """Mapping of push token : (public key, session token)""" + USER_CACHE: dict[str, list[bytes]] = {} + """Mapping of handle : [push tokens]""" + def _cache_keys(self, participants: list[str]): + # Check to see if we have cached the keys for all of the participants + if all([p in self.USER_CACHE for p in participants]): + return + # Look up the public keys for the participants, and cache a token : public key mapping lookup = self.user.lookup(participants) for key, participant in lookup.items(): if not key in self.USER_CACHE: self.USER_CACHE[key] = [] - - for identity in participant['identities']: - if not 'client-data' in identity: + + for identity in participant["identities"]: + if not "client-data" in identity: continue - if not 'public-message-identity-key' in identity['client-data']: + if not "public-message-identity-key" in identity["client-data"]: continue - if not 'push-token' in identity: + if not "push-token" in identity: continue - if not 'session-token' in identity: + if not "session-token" in identity: continue - self.USER_CACHE[key].append(identity['push-token']) + self.USER_CACHE[key].append(identity["push-token"]) - #print(identity) + # print(identity) + + self.KEY_CACHE[identity["push-token"]] = ( + identity["client-data"]["public-message-identity-key"], + identity["session-token"], + ) - self.KEY_CACHE[identity['push-token']] = (identity['client-data']['public-message-identity-key'], identity['session-token']) - def send(self, message: iMessage): # Set the sender, if it isn't already if message.sender is None: - message.sender = self.user.handles[0] # TODO : Which handle to use? - if message.sender not in message.participants: - message.participants.append(message.sender) + message.sender = self.user.handles[0] # TODO : Which handle to use? + message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants self._cache_keys(message.participants) - # Set the group id, if it isn't already - if message.group_id is None: - message.group_id = str(uuid.uuid4()).upper() # TODO: Keep track of group ids? - - message_id = uuid.uuid4() - if message.id is None: - message.id = str(message_id).upper() - # Turn the message into a raw message raw = message.to_raw() import base64 + bundled_payloads = [] for participant in message.participants: for push_token in self.USER_CACHE[participant]: - identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[push_token][0]) + identity_keys = ids.identity.IDSIdentity.decode( + self.KEY_CACHE[push_token][0] + ) payload = self._encrypt_sign_payload(identity_keys, raw) - bundled_payloads.append({ - 'tP': participant, - 'D': not participant == message.sender, # TODO: Should this be false sometimes? For self messages? - 'sT': self.KEY_CACHE[push_token][1], - 'P': payload, - 't': push_token - }) - + bundled_payloads.append( + { + "tP": participant, + "D": not participant + == message.sender, # TODO: Should this be false sometimes? For self messages? + "sT": self.KEY_CACHE[push_token][1], + "P": payload, + "t": push_token, + } + ) + msg_id = random.randbytes(4) body = { - 'fcn': 1, - 'c': 100, - 'E': 'pair', - 'ua': '[macOS,13.4.1,22F82,MacBookPro18,3]', - 'v': 8, - 'i': int.from_bytes(msg_id, 'big'), - 'U': message_id.bytes, - 'dtl': bundled_payloads, - 'sP': message.sender, - #'e': time.time_ns(), + "fcn": 1, + "c": 100, + "E": "pair", + "ua": "[macOS,13.4.1,22F82,MacBookPro18,3]", + "v": 8, + "i": int.from_bytes(msg_id, "big"), + "U": message._id.bytes, + "dtl": bundled_payloads, + "sP": message.sender, } body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY) @@ -371,7 +428,7 @@ class iMessageUser: if resp_body is None: return False resp_body = plistlib.loads(resp_body) - if 'c' not in resp_body or resp_body['c'] != 255: + if "c" not in resp_body or resp_body["c"] != 255: return False return True @@ -386,4 +443,4 @@ class iMessageUser: resp_body = apns._get_field(payload[1], 3) resp_body = plistlib.loads(resp_body) logger.error(resp_body) - num_recv += 1 \ No newline at end of file + num_recv += 1