clean up the imessage class a tad

This commit is contained in:
JJTech0130 2023-07-31 13:03:45 -04:00
parent 5b92621a90
commit 565dfb32ec
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6

View file

@ -1,98 +1,147 @@
# LOW LEVEL imessage function, decryption etc # LOW LEVEL imessage function, decryption etc
# Don't handle APNS etc, accept it already setup # Don't handle APNS etc, accept it already setup
## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc ## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc
# JSON parsing of keys, don't pass around strs?? # JSON parsing of keys, don't pass around strs??
import gzip
import logging
import plistlib
import random
import uuid
from dataclasses import dataclass, field
from hashlib import sha1, sha256
from io import BytesIO
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import apns import apns
import ids import ids
import plistlib
from io import BytesIO
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import gzip
import uuid
import random
import time
from hashlib import sha1, sha256
import logging
logger = logging.getLogger("imessage") logger = logging.getLogger("imessage")
NORMAL_NONCE = b"\x00" * 15 + b"\x01" NORMAL_NONCE = b"\x00" * 15 + b"\x01" # This is always used as the AES nonce
class BalloonBody: class BalloonBody:
"""Represents the special parts of message extensions etc."""
def __init__(self, type: str, data: bytes): def __init__(self, type: str, data: bytes):
self.type = type self.type = type
self.data = data self.data = data
# TODO : Register handlers based on type id # TODO : Register handlers based on type id
@dataclass
class iMessage: class iMessage:
"""Represents an iMessage"""
text: str = "" text: str = ""
"""Plain text of message, always required, may be an empty string"""
xml: str | None = None xml: str | None = None
participants: list[str] = [] """XML portion of message, may be None"""
participants: list[str] = field(default_factory=list)
"""List of participants in the message, including the sender"""
sender: str | None = None sender: str | None = None
id: str | None = None """Sender of the message"""
group_id: str | None = None _id: uuid.UUID | None = None
"""ID of the message, will be randomly generated if not provided"""
group_id: uuid.UUID | None = None
"""Group ID of the message, will be randomly generated if not provided"""
body: BalloonBody | None = None body: BalloonBody | None = None
"""BalloonBody, may be None"""
_compressed: bool = True _compressed: bool = True
"""Internal property representing whether the message should be compressed"""
_raw: dict | None = None _raw: dict | None = None
"""Internal property representing the original raw message, may be None"""
def from_raw(message: dict) -> 'iMessage': def sanity_check(self):
self = iMessage() """Corrects any missing fields"""
if self._id is None:
self._id = uuid.uuid4()
self._raw = message if self.group_id is None:
self.group_id = uuid.uuid4()
self.text = message.get('t') if self.sender is None:
self.xml = message.get('x') if len(self.participants) > 1:
self.participants = message.get('p', [])
if self.participants != []:
self.sender = self.participants[-1] self.sender = self.participants[-1]
else: else:
self.sender = None logger.warning(
"Message has no sender, and only one participant, sanity check failed"
)
return False
self.id = message.get('r') if self.sender not in self.participants:
self.group_id = message.get('gid') self.participants.append(self.sender)
if 'bid' in message: if self.xml != None:
# This is a message extension body self._compressed = False # XML is never compressed for some reason
self.body = BalloonBody(message['bid'], message['b'])
if 'compressed' in message: # This is a hack, not a real field return True
self._compressed = message['compressed']
return self def from_raw(message: bytes) -> "iMessage":
"""Create an `iMessage` from raw message bytes"""
compressed = False
try:
message = gzip.decompress(message)
compressed = True
except:
pass
message = plistlib.loads(message)
return iMessage(
text=message.get("t", ""),
xml=message.get("x"),
participants=message.get("p", []),
sender=message.get("p", [])[-1] if message.get("p", []) != [] else None,
_id=uuid.UUID(message.get("r")),
group_id=uuid.UUID(message.get("gid")),
body=BalloonBody(message["bid"], message["b"])
if "bid" in message
else None,
_compressed=compressed,
_raw=message,
)
def to_raw(self) -> bytes:
"""Convert an `iMessage` to raw message bytes"""
if not self.sanity_check():
raise ValueError("Message failed sanity check")
def to_raw(self) -> dict:
d = { d = {
"t": self.text, "t": self.text,
"x": self.xml, "x": self.xml,
"p": self.participants, "p": self.participants,
"r": self.id, "r": str(self._id).upper(),
"gid": self.group_id, "gid": str(self.group_id).upper(),
"compressed": self._compressed,
"pv": 0, "pv": 0,
"gv": '8', "gv": "8",
"v": '1' "v": "1",
} }
# Remove keys that are None
return {k: v for k, v in d.items() if v is not None}
def __str__(self): # Remove keys that are None
if self._raw is not None: d = {k: v for k, v in d.items() if v is not None}
return str(self._raw)
else: # Serialize as a plist
return f"iMessage({self.text} from {self.sender})" d = plistlib.dumps(d, fmt=plistlib.FMT_BINARY)
# Compression
if self._compressed:
d = gzip.compress(d, mtime=0)
return d
class iMessageUser: class iMessageUser:
"""Represents a logged in and connected iMessage user.
This abstraction should probably be reworked into IDS some time..."""
def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser): def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser):
self.connection = connection self.connection = connection
@ -103,6 +152,7 @@ class iMessageUser:
Returns a raw APNs message corresponding to the next conforming notification in the queue Returns a raw APNs message corresponding to the next conforming notification in the queue
Returns None if no conforming notification is found Returns None if no conforming notification is found
""" """
def check_response(x): def check_response(x):
if x[0] != 0x0A: if x[0] != 0x0A:
return False return False
@ -139,7 +189,13 @@ class iMessageUser:
return (body, signature) return (body, signature)
def _construct_payload(body: bytes, signature: bytes) -> bytes: def _construct_payload(body: bytes, signature: bytes) -> bytes:
payload = b"\x02" + len(body).to_bytes(2, "big") + body + len(signature).to_bytes(1, "big") + signature payload = (
b"\x02"
+ len(body).to_bytes(2, "big")
+ body
+ len(signature).to_bytes(1, "big")
+ signature
)
return payload return payload
def _hash_identity(id: bytes) -> bytes: def _hash_identity(id: bytes) -> bytes:
@ -147,38 +203,46 @@ class iMessageUser:
# TODO: Combine this with serialization code in ids.identity # TODO: Combine this with serialization code in ids.identity
output = BytesIO() output = BytesIO()
output.write(b'\x00\x41\x04') output.write(b"\x00\x41\x04")
output.write(ids._helpers.parse_key(iden.signing_public_key).public_numbers().x.to_bytes(32, "big")) output.write(
output.write(ids._helpers.parse_key(iden.signing_public_key).public_numbers().y.to_bytes(32, "big")) ids._helpers.parse_key(iden.signing_public_key)
.public_numbers()
.x.to_bytes(32, "big")
)
output.write(
ids._helpers.parse_key(iden.signing_public_key)
.public_numbers()
.y.to_bytes(32, "big")
)
output.write(b'\x00\xAC') output.write(b"\x00\xAC")
output.write(b'\x30\x81\xA9') output.write(b"\x30\x81\xA9")
output.write(b'\x02\x81\xA1') output.write(b"\x02\x81\xA1")
output.write(ids._helpers.parse_key(iden.encryption_public_key).public_numbers().n.to_bytes(161, "big")) output.write(
output.write(b'\x02\x03\x01\x00\x01') ids._helpers.parse_key(iden.encryption_public_key)
.public_numbers()
.n.to_bytes(161, "big")
)
output.write(b"\x02\x03\x01\x00\x01")
return sha256(output.getvalue()).digest() return sha256(output.getvalue()).digest()
def _encrypt_sign_payload(self, key: ids.identity.IDSIdentity, message: dict) -> bytes: def _encrypt_sign_payload(
# Dump the message plist self, key: ids.identity.IDSIdentity, message: bytes
compressed = message.get('compressed', False) ) -> bytes:
# Remove the compressed flag from the message
#if 'compressed' in message:
# del message['compressed']
m2 = message.copy()
if 'compressed' in m2:
del m2['compressed']
message = plistlib.dumps(m2, fmt=plistlib.FMT_BINARY)
# Compress the message
if compressed:
message = gzip.compress(message, mtime=0)
# Generate a random AES key # Generate a random AES key
random_seed = random.randbytes(11) random_seed = random.randbytes(11)
# Create the HMAC # Create the HMAC
import hmac import hmac
hm = hmac.new(random_seed, message + b"\x02" + iMessageUser._hash_identity(self.user.encryption_identity.encode()) + iMessageUser._hash_identity(key.encode()), sha256).digest()
hm = hmac.new(
random_seed,
message
+ b"\x02"
+ iMessageUser._hash_identity(self.user.encryption_identity.encode())
+ iMessageUser._hash_identity(key.encode()),
sha256,
).digest()
aes_key = random_seed + hm[:5] aes_key = random_seed + hm[:5]
@ -201,7 +265,9 @@ class iMessageUser:
# Construct the payload # Construct the payload
body = rsa_body + encrypted[100:] body = rsa_body + encrypted[100:]
sig = ids._helpers.parse_key(self.user.encryption_identity.signing_key).sign(body, ec.ECDSA(hashes.SHA1())) sig = ids._helpers.parse_key(self.user.encryption_identity.signing_key).sign(
body, ec.ECDSA(hashes.SHA1())
)
payload = iMessageUser._construct_payload(body, sig) payload = iMessageUser._construct_payload(body, sig)
return payload return payload
@ -210,7 +276,9 @@ class iMessageUser:
payload = iMessageUser._parse_payload(payload) payload = iMessageUser._parse_payload(payload)
body = BytesIO(payload[0]) body = BytesIO(payload[0])
rsa_body = ids._helpers.parse_key(self.user.encryption_identity.encryption_key).decrypt( rsa_body = ids._helpers.parse_key(
self.user.encryption_identity.encryption_key
).decrypt(
body.read(160), body.read(160),
padding.OAEP( padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()), mgf=padding.MGF1(algorithm=hashes.SHA1()),
@ -222,35 +290,19 @@ class iMessageUser:
cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE)) cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE))
decrypted = cipher.decryptor().update(rsa_body[16:] + body.read()) decrypted = cipher.decryptor().update(rsa_body[16:] + body.read())
# Try to gzip decompress the payload return decrypted
compressed = False
try:
decrypted = gzip.decompress(decrypted)
compressed = True
except:
pass
pl = plistlib.loads(decrypted)
pl['compressed'] = compressed # This is a hack so that messages can be re-encrypted with the same compression
return pl
def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool: def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool:
# Get the public key for the sender # Get the public key for the sender
lookup = self.user.lookup([sender])[sender] self._cache_keys([sender])
sender_iden = None if not sender_token in self.KEY_CACHE:
for identity in lookup['identities']: logger.warning("Unable to find the public key of the sender, cannot verify")
if identity['push-token'] == sender_token: return False
sender_iden = identity
break
identity_keys = sender_iden['client-data']['public-message-identity-key']
identity_keys = ids.identity.IDSIdentity.decode(identity_keys)
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token][0])
sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key) sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key)
payload = iMessageUser._parse_payload(payload) payload = iMessageUser._parse_payload(payload)
try: try:
@ -275,18 +327,24 @@ class iMessageUser:
body = plistlib.loads(body) body = plistlib.loads(body)
print(f"Got body message {body}") print(f"Got body message {body}")
payload = body["P"] payload = body["P"]
decrypted = self._decrypt_payload(payload)
if "p" in decrypted: if not self._verify_payload(payload, body['sP'], body["t"]):
if not self._verify_payload(payload, decrypted["p"][-1], body["t"]):
raise Exception("Failed to verify payload") raise Exception("Failed to verify payload")
else:
logger.warning("Unable to verify, couldn't determine sender! Dropping message! (TODO work out a way to verify these anyway)") decrypted = self._decrypt_payload(payload)
return self.receive() # Call again to get the next message
return iMessage.from_raw(decrypted) return iMessage.from_raw(decrypted)
KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {} # Mapping of push token : (public key, session token) KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {}
USER_CACHE: dict[str, list[bytes]] = {} # Mapping of handle : [push tokens] """Mapping of push token : (public key, session token)"""
USER_CACHE: dict[str, list[bytes]] = {}
"""Mapping of handle : [push tokens]"""
def _cache_keys(self, participants: list[str]): def _cache_keys(self, participants: list[str]):
# Check to see if we have cached the keys for all of the participants
if all([p in self.USER_CACHE for p in participants]):
return
# Look up the public keys for the participants, and cache a token : public key mapping # Look up the public keys for the participants, and cache a token : public key mapping
lookup = self.user.lookup(participants) lookup = self.user.lookup(participants)
@ -294,68 +352,67 @@ class iMessageUser:
if not key in self.USER_CACHE: if not key in self.USER_CACHE:
self.USER_CACHE[key] = [] self.USER_CACHE[key] = []
for identity in participant['identities']: for identity in participant["identities"]:
if not 'client-data' in identity: if not "client-data" in identity:
continue continue
if not 'public-message-identity-key' in identity['client-data']: if not "public-message-identity-key" in identity["client-data"]:
continue continue
if not 'push-token' in identity: if not "push-token" in identity:
continue continue
if not 'session-token' in identity: if not "session-token" in identity:
continue continue
self.USER_CACHE[key].append(identity['push-token']) self.USER_CACHE[key].append(identity["push-token"])
# print(identity) # print(identity)
self.KEY_CACHE[identity['push-token']] = (identity['client-data']['public-message-identity-key'], identity['session-token']) self.KEY_CACHE[identity["push-token"]] = (
identity["client-data"]["public-message-identity-key"],
identity["session-token"],
)
def send(self, message: iMessage): def send(self, message: iMessage):
# Set the sender, if it isn't already # Set the sender, if it isn't already
if message.sender is None: if message.sender is None:
message.sender = self.user.handles[0] # TODO : Which handle to use? message.sender = self.user.handles[0] # TODO : Which handle to use?
if message.sender not in message.participants:
message.participants.append(message.sender)
message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants
self._cache_keys(message.participants) self._cache_keys(message.participants)
# Set the group id, if it isn't already
if message.group_id is None:
message.group_id = str(uuid.uuid4()).upper() # TODO: Keep track of group ids?
message_id = uuid.uuid4()
if message.id is None:
message.id = str(message_id).upper()
# Turn the message into a raw message # Turn the message into a raw message
raw = message.to_raw() raw = message.to_raw()
import base64 import base64
bundled_payloads = [] bundled_payloads = []
for participant in message.participants: for participant in message.participants:
for push_token in self.USER_CACHE[participant]: for push_token in self.USER_CACHE[participant]:
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[push_token][0]) identity_keys = ids.identity.IDSIdentity.decode(
self.KEY_CACHE[push_token][0]
)
payload = self._encrypt_sign_payload(identity_keys, raw) payload = self._encrypt_sign_payload(identity_keys, raw)
bundled_payloads.append({ bundled_payloads.append(
'tP': participant, {
'D': not participant == message.sender, # TODO: Should this be false sometimes? For self messages? "tP": participant,
'sT': self.KEY_CACHE[push_token][1], "D": not participant
'P': payload, == message.sender, # TODO: Should this be false sometimes? For self messages?
't': push_token "sT": self.KEY_CACHE[push_token][1],
}) "P": payload,
"t": push_token,
}
)
msg_id = random.randbytes(4) msg_id = random.randbytes(4)
body = { body = {
'fcn': 1, "fcn": 1,
'c': 100, "c": 100,
'E': 'pair', "E": "pair",
'ua': '[macOS,13.4.1,22F82,MacBookPro18,3]', "ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
'v': 8, "v": 8,
'i': int.from_bytes(msg_id, 'big'), "i": int.from_bytes(msg_id, "big"),
'U': message_id.bytes, "U": message._id.bytes,
'dtl': bundled_payloads, "dtl": bundled_payloads,
'sP': message.sender, "sP": message.sender,
#'e': time.time_ns(),
} }
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY) body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
@ -371,7 +428,7 @@ class iMessageUser:
if resp_body is None: if resp_body is None:
return False return False
resp_body = plistlib.loads(resp_body) resp_body = plistlib.loads(resp_body)
if 'c' not in resp_body or resp_body['c'] != 255: if "c" not in resp_body or resp_body["c"] != 255:
return False return False
return True return True