From 4c345a0377f3825379129433d7c0349bcdf5571c Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Fri, 25 Aug 2023 22:26:37 -0400 Subject: [PATCH] refactoring and cleanup --- demo.py | 252 +++++++++++++++++++++++++++++++++--------------- ids/__init__.py | 205 +++++++++++++++++++++++---------------- ids/_helpers.py | 22 +++++ ids/identity.py | 117 ++++++---------------- imessage.py | 19 ++-- 5 files changed, 361 insertions(+), 254 deletions(-) diff --git a/demo.py b/demo.py index d9b8091..1c31a2e 100644 --- a/demo.py +++ b/demo.py @@ -59,112 +59,208 @@ def safe_b64decode(s): return b64decode(s) except: return None + +def safe_config(): + with open("config.json", "w") as f: + json.dump(CONFIG, f, indent=4) async def main(): + # Load any existing push credentials token = CONFIG.get("push", {}).get("token") - if token is not None: - token = b64decode(token) - else: - token = b"" + token = b64decode(token) if token is not None else b"" push_creds = apns.PushCredentials( CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token) async with apns.APNSConnection.start(push_creds) as conn: + # Save the push credentials to the config + CONFIG["push"] = { + "token": b64encode(conn.credentials.token).decode(), + "key": conn.credentials.private_key, + "cert": conn.credentials.cert, + } + safe_config() + + # Activate the connection await conn.set_state(1) await conn.filter(["com.apple.madrid"]) - user = ids.IDSUser(conn) + # If the user wants a phone number, we need to register it WITH an Apple ID, then register the Apple ID again + # otherwise we encounter issues for some reason + + users = [] + if "id" in CONFIG: + logging.debug("Restoring old-style identity...") - if CONFIG.get("auth", {}).get("cert") is not None: - auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) - user_id = CONFIG["auth"]["user_id"] - handles = CONFIG["auth"]["handles"] - user.restore_authentication(auth_keypair, user_id, handles) + users.append(ids.IDSAppleUser(conn, CONFIG["auth"]["user_id"], ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]), + ids.identity.IDSIdentity(CONFIG["encryption"]["ec_key"], CONFIG["encryption"]["rsa_key"]), CONFIG["id"]["cert"], + CONFIG["auth"]["handles"])) + if "users" in CONFIG: + logging.debug("Restoring new-style identity...") + for user in CONFIG["users"]: + users.append(ids.IDSUser(conn, user["id"], ids._helpers.KeyPair(user["auth_key"], user["auth_cert"]), + ids.identity.IDSIdentity(user["signing_key"], user["encryption_key"]), user["id_cert"], + user["handles"])) + else: - username = input("Username: ") - password = getpass("Password: ") + print("Would you like to register a phone number? (y/n)") + if input("> ").lower() == "y": + if "phone" in CONFIG: + phone_sig = b64decode(CONFIG["phone"].get("sig")) + phone_number = CONFIG["phone"].get("number") + else: + import sms_registration + phone_number, phone_sig = sms_registration.register(conn.credentials.token) + CONFIG["phone"] = { + "number": phone_number, + "sig": b64encode(phone_sig).decode(), + } + safe_config() - user.authenticate(username, password) + users.append(ids.IDSPhoneUser.authenticate(conn, phone_number, phone_sig)) - import sms_registration - phone_sig = safe_b64decode(CONFIG.get("phone", {}).get("sig")) - phone_number = CONFIG.get("phone", {}).get("number") + print("Would you like sign in to your Apple ID (recommended)? (y/n)") + if input("> ").lower() == "y": + username = input("Username: ") + password = input("Password: ") - if phone_sig is None or phone_number is None: - print("Registering phone number...") - phone_number, phone_sig = sms_registration.register(user.push_connection.credentials.token) - CONFIG["phone"] = { - "number": phone_number, - "sig": b64encode(phone_sig).decode(), - } - if CONFIG.get("phone", {}).get("auth_key") is not None and CONFIG.get("phone", {}).get("auth_cert") is not None: - phone_auth_keypair = ids._helpers.KeyPair(CONFIG["phone"]["auth_key"], CONFIG["phone"]["auth_cert"]) - else: - phone_auth_keypair = ids.profile.get_phone_cert(phone_number, user.push_connection.credentials.token, [phone_sig]) - CONFIG["phone"]["auth_key"] = phone_auth_keypair.key - CONFIG["phone"]["auth_cert"] = phone_auth_keypair.cert + users.append(ids.IDSAppleUser.authenticate(conn, username, password)) - - user.encryption_identity = ids.identity.IDSIdentity( - encryption_key=CONFIG.get("encryption", {}).get("rsa_key"), - signing_key=CONFIG.get("encryption", {}).get("ec_key"), - ) - - #user._auth_keypair = phone_auth_keypair - user.handles = [f"tel:{phone_number}"] - print(user.user_id) - # user.user_id = f"P:{phone_number}" - - - if ( - CONFIG.get("id", {}).get("cert") is not None - and user.encryption_identity is not None - ): - id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) - user.restore_identity(id_keypair) - else: - logging.info("Registering new identity...") import emulated.nac vd = emulated.nac.generate_validation_data() vd = b64encode(vd).decode() - user.register(vd, [("P:" + phone_number, phone_auth_keypair)]) - #user.register(vd) + users = ids.register(conn, users, vd) - print("Handles: ", user.handles) + CONFIG["users"] = [] + for user in users: + CONFIG["users"].append({ + "id": user.user_id, + "auth_key": user.auth_keypair.key, + "auth_cert": user.auth_keypair.cert, + "encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None, + "signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None, + "id_cert": user.id_cert, + "handles": user.handles, + }) + safe_config() - # Write config.json - CONFIG["encryption"] = { - "rsa_key": user.encryption_identity.encryption_key, - "ec_key": user.encryption_identity.signing_key, - } - CONFIG["id"] = { - "key": user._id_keypair.key, - "cert": user._id_keypair.cert, - } - CONFIG["auth"] = { - "key": user._auth_keypair.key, - "cert": user._auth_keypair.cert, - "user_id": user.user_id, - "handles": user.handles, - } - CONFIG["push"] = { - "token": b64encode(user.push_connection.credentials.token).decode(), - "key": user.push_connection.credentials.private_key, - "cert": user.push_connection.credentials.cert, - } + # You CANNOT turn around and re-register like this: + # It will BREAK the tie between phone number and Apple ID - with open("config.json", "w") as f: - json.dump(CONFIG, f, indent=4) + # import emulated.nac - im = imessage.iMessageUser(conn, user) + # vd = emulated.nac.generate_validation_data() + # vd = b64encode(vd).decode() + + # users = ids.register(conn, [users[1]], vd) + + print(f"Done? {users}") + + + + + + + + + + + + + # user = ids.IDSUser(conn) + + # if CONFIG.get("auth", {}).get("cert") is not None: + # auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) + # user_id = CONFIG["auth"]["user_id"] + # handles = CONFIG["auth"]["handles"] + # user.restore_authentication(auth_keypair, user_id, handles) + # else: + # username = input("Username: ") + # password = getpass("Password: ") + + # user.authenticate(username, password) + + # import sms_registration + # phone_sig = safe_b64decode(CONFIG.get("phone", {}).get("sig")) + # phone_number = CONFIG.get("phone", {}).get("number") + + # if phone_sig is None or phone_number is None: + # print("Registering phone number...") + # phone_number, phone_sig = sms_registration.register(user.push_connection.credentials.token) + # CONFIG["phone"] = { + # "number": phone_number, + # "sig": b64encode(phone_sig).decode(), + # } + # if CONFIG.get("phone", {}).get("auth_key") is not None and CONFIG.get("phone", {}).get("auth_cert") is not None: + # phone_auth_keypair = ids._helpers.KeyPair(CONFIG["phone"]["auth_key"], CONFIG["phone"]["auth_cert"]) + # else: + # phone_auth_keypair = ids.profile.get_phone_cert(phone_number, user.push_connection.credentials.token, [phone_sig]) + # CONFIG["phone"]["auth_key"] = phone_auth_keypair.key + # CONFIG["phone"]["auth_cert"] = phone_auth_keypair.cert + + + # user.encryption_identity = ids.identity.IDSIdentity( + # encryption_key=CONFIG.get("encryption", {}).get("rsa_key"), + # signing_key=CONFIG.get("encryption", {}).get("ec_key"), + # ) + + # #user._auth_keypair = phone_auth_keypair + # user.handles = [f"tel:{phone_number}"] + # print(user.user_id) + # # user.user_id = f"P:{phone_number}" + + + # if ( + # CONFIG.get("id", {}).get("cert") is not None + # and user.encryption_identity is not None + # ): + # id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) + # user.restore_identity(id_keypair) + # else: + # logging.info("Registering new identity...") + # import emulated.nac + + # vd = emulated.nac.generate_validation_data() + # vd = b64encode(vd).decode() + + # ids.register + # user.register(vd, [("P:" + phone_number, phone_auth_keypair)]) + # #user.register(vd) + + # print("Handles: ", user.handles) + + # # Write config.json + # CONFIG["encryption"] = { + # "rsa_key": user.encryption_identity.encryption_key, + # "ec_key": user.encryption_identity.signing_key, + # } + # CONFIG["id"] = { + # "key": user._id_keypair.key, + # "cert": user._id_keypair.cert, + # } + # CONFIG["auth"] = { + # "key": user._auth_keypair.key, + # "cert": user._auth_keypair.cert, + # "user_id": user.user_id, + # "handles": user.handles, + # } + # CONFIG["push"] = { + # "token": b64encode(user.push_connection.credentials.token).decode(), + # "key": user.push_connection.credentials.private_key, + # "cert": user.push_connection.credentials.cert, + # } + + # with open("config.json", "w") as f: + # json.dump(CONFIG, f, indent=4) + + # im = imessage.iMessageUser(conn, user) # Send a message to myself - async with trio.open_nursery() as nursery: - nursery.start_soon(input_task, im) - nursery.start_soon(output_task, im) + # async with trio.open_nursery() as nursery: + # nursery.start_soon(input_task, im) + # nursery.start_soon(output_task, im) async def input_task(im: imessage.iMessageUser): while True: diff --git a/ids/__init__.py b/ids/__init__.py index c8fc540..c433257 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -5,99 +5,142 @@ import apns from . import _helpers, identity, profile, query from typing import Callable, Any + +import dataclasses +import apns +from . import profile, _helpers +from base64 import b64encode +from typing import Callable + +@dataclasses.dataclass class IDSUser: - # Sets self.user_id and self._auth_token - def _authenticate_for_token( - self, username: str, password: str, factor_callback: Callable | None = None - ): - self.user_id, self._auth_token = profile.get_auth_token( - username, password, factor_callback - ) + push_connection: apns.APNSConnection - # Sets self._auth_keypair using self.user_id and self._auth_token - def _authenticate_for_cert(self): - self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token) + user_id: str - # Factor callback will be called if a 2FA code is necessary - def __init__( - self, - push_connection: apns.APNSConnection, - ): - self.push_connection = push_connection - self._push_keypair = _helpers.KeyPair( - self.push_connection.credentials.private_key, self.push_connection.credentials.cert - ) + auth_keypair: _helpers.KeyPair + """ + Long-lived authentication keypair + """ - self.ec_key = self.rsa_key = None + encryption_identity: identity.IDSIdentity | None = None - def __str__(self): - return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.credentials.token).decode()})" + id_cert: bytes | None = None + """ + Short-lived identity certificate, + same private key as auth_keypair + """ - # Authenticates with a username and password, to create a brand new authentication keypair - def authenticate( - self, username: str, password: str, factor_callback: Callable | None = None - ): - self._authenticate_for_token(username, password, factor_callback) - self._authenticate_for_cert() - self.handles = profile.get_handles( - b64encode(self.push_connection.credentials.token), - self.user_id, - self._auth_keypair, - self._push_keypair, - ) - self.current_handle = self.handles[0] + handles: list[str] = dataclasses.field(default_factory=list) + """ + List of usable handles. Not equivalent to the current result of possible_handles, as the user + may have added or removed handles since registration, which we can't use. + """ - - # Uses an existing authentication keypair - def restore_authentication( - self, auth_keypair: _helpers.KeyPair, user_id: str, handles: list - ): - self._auth_keypair = auth_keypair - self.user_id = user_id - self.handles = handles - self.current_handle = self.handles[0] - - # This is a separate call so that the user can make sure the first part succeeds before asking for validation data - def register(self, validation_data: str, additional_keys: list[tuple[str, _helpers.KeyPair]] = [], additional_handles: list[str] = []): + def possible_handles(self) -> list[str]: """ - self.ec_key, self.rsa_key will be set to a randomly gnenerated EC and RSA keypair - if they are not already set + Returns a list of possible handles for this user. """ - if self.encryption_identity is None: - self.encryption_identity = identity.IDSIdentity() - - auth_keys = additional_keys - auth_keys.extend([(self.user_id, self._auth_keypair)]) - #auth_keys.extend(additional_keys) - - handles_request = self.handles - - handles_request.extend(additional_handles) - - - cert = identity.register( - b64encode(self.push_connection.credentials.token), - self.handles, - self.user_id, - auth_keys, - self._push_keypair, - self.encryption_identity, - validation_data, - ) - self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert) - - # Refresh handles - self.handles = profile.get_handles( + return profile.get_handles( b64encode(self.push_connection.credentials.token), self.user_id, - self._auth_keypair, - self._push_keypair, + self.auth_keypair, + _helpers.KeyPair(self.push_connection.credentials.private_key, self.push_connection.credentials.cert), ) + + async def lookup(self, handle: str, uris: list[str], topic: str = "com.apple.madrid") -> Any: + if handle not in self.handles: + raise Exception("Handle not registered to user") + return await query.lookup(self.push_connection, handle, _helpers.KeyPair(self.auth_keypair.key, self.id_cert), uris, topic) - def restore_identity(self, id_keypair: _helpers.KeyPair): - self._id_keypair = id_keypair +@dataclasses.dataclass +class IDSAppleUser(IDSUser): + """ + An IDSUser that is authenticated with an Apple ID + """ - async def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> Any: - return await query.lookup(self.push_connection, self.current_handle, self._id_keypair, uris, topic) - + @staticmethod + def authenticate(push_connection: apns.APNSConnection, username: str, password: str, factor_callback: Callable | None = None) -> IDSUser: + user_id, auth_token = profile.get_auth_token(username, password, factor_callback) + auth_keypair = profile.get_auth_cert(user_id, auth_token) + + return IDSAppleUser(push_connection, user_id, auth_keypair) + +@dataclasses.dataclass +class IDSPhoneUser(IDSUser): + """ + An IDSUser that is authenticated with a phone number + """ + + @staticmethod + def authenticate(push_connection: apns.APNSConnection, phone_number: str, phone_sig: bytes) -> IDSUser: + auth_keypair = profile.get_phone_cert(phone_number, push_connection.credentials.token, [phone_sig]) + + return IDSPhoneUser(push_connection, "P:" + phone_number, auth_keypair) + +DEFAULT_CLIENT_DATA = { + 'is-c2k-equipment': True, + 'optionally-receive-typing-indicators': True, + 'public-message-identity-version':2, + 'show-peer-errors': True, + 'supports-ack-v1': True, + 'supports-activity-sharing-v1': True, + 'supports-audio-messaging-v2': True, + "supports-autoloopvideo-v1": True, + 'supports-be-v1': True, + 'supports-ca-v1': True, + 'supports-fsm-v1': True, + 'supports-fsm-v2': True, + 'supports-fsm-v3': True, + 'supports-ii-v1': True, + 'supports-impact-v1': True, + 'supports-inline-attachments': True, + 'supports-keep-receipts': True, + "supports-location-sharing": True, + 'supports-media-v2': True, + 'supports-photos-extension-v1': True, + 'supports-st-v1': True, + 'supports-update-attachments-v1': True, +} + +import uuid + +def register(push_connection: apns.APNSConnection, users: list[IDSUser], validation_data: str): + signing_users = [(user.user_id, user.auth_keypair) for user in users] + + # Create new encryption identity for each user + for user in users: + if user.encryption_identity is None: + user.encryption_identity = identity.IDSIdentity() + + # Construct user payloads + user_payloads = [] + for user in users: + user.handles = user.possible_handles() + if user.encryption_identity is not None: + special_data = DEFAULT_CLIENT_DATA.copy() + special_data["public-message-identity-key"] = user.encryption_identity.encode() + else: + special_data = DEFAULT_CLIENT_DATA + user_payloads.append({ + "client-data": special_data, + "tag": "SIM" if isinstance(user, IDSPhoneUser) else None, + "uris": [{"uri": handle} for handle in user.handles], + "user-id": user.user_id, + }) + + _helpers.recursive_del_none(user_payloads) + + certs = identity.register( + push_connection, + signing_users, + user_payloads, + validation_data, + uuid.uuid4() + ) + + for user in users: + user.id_cert = certs[user.user_id] + + return users \ No newline at end of file diff --git a/ids/_helpers.py b/ids/_helpers.py index 7a905b6..5bb8f6e 100644 --- a/ids/_helpers.py +++ b/ids/_helpers.py @@ -3,9 +3,31 @@ from collections import namedtuple USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" PROTOCOL_VERSION = "1640" + # KeyPair is a named tuple that holds a key and a certificate in PEM form KeyPair = namedtuple("KeyPair", ["key", "cert"]) +import apns + +def get_key_pair(creds: apns.PushCredentials): + return KeyPair(creds.private_key, creds.cert) + +def recursive_del_none(d: dict | list): + if isinstance(d, dict): + for k, v in list(d.items()): + if v is None: + del d[k] + else: + recursive_del_none(v) + elif isinstance(d, list): + for i, v in enumerate(d): + if v is None: + del d[i] + else: + recursive_del_none(v) + +#apns.PushCredentials.key_pair = get_key_pair # tydpe: ignore # Monkey patching + def dearmour(armoured: str) -> str: import re diff --git a/ids/identity.py b/ids/identity.py index 15daf99..bfd73ca 100644 --- a/ids/identity.py +++ b/ids/identity.py @@ -88,21 +88,26 @@ class IDSIdentity: output.write(raw_rsa.getvalue()) return output.getvalue() - + + + +import apns +from . import _helpers +import uuid +from base64 import b64encode def register( - push_token, handles, user_id, auth_keys: list[tuple[str, KeyPair]], push_key: KeyPair, identity: IDSIdentity, validation_data + push_connection: apns.APNSConnection, signing_users: list[tuple[str, _helpers.KeyPair]], user_payloads: list[dict], validation_data, device_id: uuid.UUID ): - logger.debug(f"Registering IDS identity for {handles}") - uris = [{"uri": handle} for handle in handles] - import uuid body = { + # TODO: Abstract this out "device-name": "pypush", "hardware-version": "MacBookPro18,3", "language": "en-US", "os-version": "macOS,13.2.1,22D68", "software-version": "22D68", + "private-device-data": { - "u": uuid.uuid4().hex.upper(), + "u": str(device_id), }, "services": [ { @@ -112,93 +117,25 @@ def register( "com.apple.private.alloy.gelato", "com.apple.private.alloy.biz", "com.apple.private.alloy.gamecenter.imessage"], - "users": [ - { - "client-data": { - 'is-c2k-equipment': True, - 'optionally-receive-typing-indicators': True, - 'public-message-identity-key': identity.encode(), - 'public-message-identity-version':2, - 'show-peer-errors': True, - 'supports-ack-v1': True, - 'supports-activity-sharing-v1': True, - 'supports-audio-messaging-v2': True, - "supports-autoloopvideo-v1": True, - 'supports-be-v1': True, - 'supports-ca-v1': True, - 'supports-fsm-v1': True, - 'supports-fsm-v2': True, - 'supports-fsm-v3': True, - 'supports-ii-v1': True, - 'supports-impact-v1': True, - 'supports-inline-attachments': True, - 'supports-keep-receipts': True, - "supports-location-sharing": True, - 'supports-media-v2': True, - 'supports-photos-extension-v1': True, - 'supports-st-v1': True, - 'supports-update-attachments-v1': True, - }, - "tag": "SIM", - "uris": uris, - "user-id": auth_keys[0][0], - - }, - { - "client-data": { - 'is-c2k-equipment': True, - 'optionally-receive-typing-indicators': True, - 'public-message-identity-key': identity.encode(), - 'public-message-identity-version':2, - 'show-peer-errors': True, - 'supports-ack-v1': True, - 'supports-activity-sharing-v1': True, - 'supports-audio-messaging-v2': True, - "supports-autoloopvideo-v1": True, - 'supports-be-v1': True, - 'supports-ca-v1': True, - 'supports-fsm-v1': True, - 'supports-fsm-v2': True, - 'supports-fsm-v3': True, - 'supports-ii-v1': True, - 'supports-impact-v1': True, - 'supports-inline-attachments': True, - 'supports-keep-receipts': True, - "supports-location-sharing": True, - 'supports-media-v2': True, - 'supports-photos-extension-v1': True, - 'supports-st-v1': True, - 'supports-update-attachments-v1': True, - }, - "uris": [{ - "uri": "tel:+16106632676" - }], - "user-id": user_id, - - }, - # { - # "uris": uris, - # "user-id": auth_keys[1][0] - # } - ], + "users": user_payloads, } ], "validation-data": b64decode(validation_data), } - - logger.debug(body) + + logger.debug(f"Sending IDS registration request: {body}") body = plistlib.dumps(body) + # Construct headers headers = { "x-protocol-version": PROTOCOL_VERSION, - #"x-auth-user-id-0": user_id, } - for i, (user_id, keypair) in enumerate(auth_keys): + for i, (user_id, keypair) in enumerate(signing_users): headers[f"x-auth-user-id-{i}"] = user_id - add_auth_signature(headers, body, "id-register", keypair, push_key, push_token, i) + add_auth_signature(headers, body, "id-register", keypair, _helpers.get_key_pair(push_connection.credentials), b64encode(push_connection.credentials.token).decode(), i) - print(headers) + logger.debug(f"Headers: {headers}") r = requests.post( "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", @@ -207,8 +144,9 @@ def register( verify=False, ) r = plistlib.loads(r.content) - #print(f'Response code: {r["status"]}') - logger.debug(f"Recieved response to IDS registration: {r}") + + logger.debug(f"Received response to IDS registration: {r}") + if "status" in r and r["status"] == 6004: raise Exception("Validation data expired!") # TODO: Do validation of nested statuses @@ -218,7 +156,14 @@ def register( raise Exception(f"No services in response: {r}") if not "users" in r["services"][0]: raise Exception(f"No users in response: {r}") - if not "cert" in r["services"][0]["users"][0]: - raise Exception(f"No cert in response: {r}") + + output = {} + for user in r["services"][0]["users"]: + if not "cert" in user: + raise Exception(f"No cert in response: {r}") + for uri in user["uris"]: + if uri["status"] != 0: + raise Exception(f"Failed to register URI {uri['uri']}: {r}") + output[user["user-id"]] = armour_cert(user["cert"]) - return armour_cert(r["services"][0]["users"][0]["cert"]) + return output \ No newline at end of file diff --git a/imessage.py b/imessage.py index 02c146f..c67b049 100644 --- a/imessage.py +++ b/imessage.py @@ -267,7 +267,7 @@ class iMessage(Message): def create(user: "iMessageUser", text: str, participants: list[str]) -> "iMessage": """Creates a basic outgoing `iMessage` from the given text and participants""" - sender = user.user.current_handle + sender = user.current_handle if sender not in participants: participants += [sender] @@ -356,6 +356,7 @@ class iMessageUser: def __init__(self, connection: apns.APNSConnection, user: ids.IDSUser): self.connection = connection self.user = user + self.current_handle = user.handles[0] @staticmethod def _parse_payload(p: bytes) -> tuple[bytes, bytes]: @@ -421,7 +422,7 @@ class iMessageUser: random_seed, message + b"\x02" - + iMessageUser._hash_identity(self.user.encryption_identity.encode()) + + iMessageUser._hash_identity(self.user.encryption_identity.encode()) # type: ignore + iMessageUser._hash_identity(key.encode()), sha256, ).digest() @@ -528,8 +529,8 @@ class iMessageUser: async def _cache_keys(self, participants: list[str], topic: str): # Clear the cache if the handle has changed - if self.KEY_CACHE_HANDLE != self.user.current_handle: - self.KEY_CACHE_HANDLE = self.user.current_handle + if self.KEY_CACHE_HANDLE != self.current_handle: + self.KEY_CACHE_HANDLE = self.current_handle self.KEY_CACHE = {} self.USER_CACHE = {} @@ -539,7 +540,7 @@ class iMessageUser: # TODO: This doesn't work since it doesn't check if they are cached for all topics # Look up the public keys for the participants, and cache a token : public key mapping - lookup = await self.user.lookup(participants, topic=topic) + lookup = await self.user.lookup(self.current_handle, participants, topic=topic) logger.debug(f"Lookup response : {lookup}") for key, participant in lookup.items(): @@ -594,7 +595,7 @@ class iMessageUser: p = { "tP": participant, - "D": not participant == self.user.current_handle, + "D": not participant == self.current_handle, "sT": self.KEY_CACHE[push_token][topic][1], "t": push_token, } @@ -618,7 +619,7 @@ class iMessageUser: "i": int.from_bytes(message_id, "big"), "U": id.bytes, "dtl": dtl, - "sP": self.user.current_handle, + "sP": self.current_handle, } body.update(extra) @@ -671,7 +672,7 @@ class iMessageUser: await self._send_raw( 147, - [self.user.current_handle], + [self.current_handle], "com.apple.private.alloy.sms", extra={ "nr": 1 @@ -686,7 +687,7 @@ class iMessageUser: else: raise Exception("Unknown message type") - send_to = message.participants if isinstance(message, iMessage) else [self.user.current_handle] + send_to = message.participants if isinstance(message, iMessage) else [self.current_handle] await self._cache_keys(send_to, topic)