import plistlib import random import uuid import gzip from base64 import b64decode, b64encode from datetime import datetime import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.x509.oid import NameOID from collections import namedtuple import apns import bags import gsa USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" KeyPair = namedtuple("KeyPair", ["key", "cert"]) # Nonce Format: # 01000001876bd0a2c0e571093967fce3d7 # 01 # version # 000001876d008cc5 # unix time # r1r2r3r4r5r6r7r8 # random bytes def generate_nonce() -> bytes: return ( b"\x01" + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + random.randbytes(8) ) def _create_payload( bag_key: str, query_string: str, push_token: str, payload: bytes, nonce: bytes = None, ) -> tuple[str, bytes]: # Generate the nonce if nonce is None: nonce = generate_nonce() push_token = b64decode(push_token) return ( nonce + len(bag_key).to_bytes(4, 'big') + bag_key.encode() + len(query_string).to_bytes(4, 'big') + query_string.encode() + len(payload).to_bytes(4, 'big') + payload + len(push_token).to_bytes(4, 'big') + push_token, nonce, ) def sign_payload( private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes ) -> tuple[str, bytes]: # Load the private key key = serialization.load_pem_private_key( private_key.encode(), password=None, backend=default_backend() ) payload, nonce = _create_payload(bag_key, query_string, push_token, payload) sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) sig = b"\x01\x01" + sig sig = b64encode(sig).decode() return sig, nonce # global_key, global_cert = load_keys() def _send_request(conn: apns.APNSConnection, bag_key: str, topic: str, body: bytes, keypair: KeyPair, username: str) -> bytes: body = gzip.compress(body, mtime=0) push_token = b64encode(conn.token).decode() # Sign the request signature, nonce = sign_payload(keypair.key, bag_key, "", push_token, body) headers = { "x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "") .replace("-----END CERTIFICATE-----", "") .replace("\n", ""), "x-id-nonce": b64encode(nonce).decode(), "x-id-sig": signature, "x-push-token": push_token, "x-id-self-uri": 'mailto:' + username, "User-Agent": USER_AGENT, "x-protocol-version": "1630", } #print(headers) req = { "cT": "application/x-apple-plist", "U": b"\x16%C\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes? "c": 96, "ua": USER_AGENT, "u": bags.ids_bag()[bag_key], "h": headers, "v": 2, "b": body, } conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) resp = conn.wait_for_packet(0x0A) resp_body = apns._get_field(resp[1], 3) if resp_body is None: raise (Exception(f"Got invalid response: {resp}")) return resp_body # Performs an IDS lookup # conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic # self: the user's email address # keypair: a KeyPair object containing the user's private key and certificate # topic: the IDS topic to query # query: a list of URIs to query def lookup(conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str]) -> any: conn.filter([topic]) query = {"uris": query} resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) resp = plistlib.loads(resp) resp = gzip.decompress(resp["b"]) resp = plistlib.loads(resp) return resp def _auth_token_request(username: str, password: str) -> any: # Turn the PET into an auth token data = { "apple-id": username, "client-id": str(uuid.uuid4()), "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, "password": password, } data = plistlib.dumps(data) r = requests.post( "https://setup.icloud.com/setup/prefpane/loginDelegates", auth=(username, password), data=data, verify=False, ) r = plistlib.loads(r.content) return r # Gets an IDS auth token for the given username and password # If use_gsa is True, GSA authentication will be used, which requires anisette # If use_gsa is False, it will use a old style 2FA code # If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted # Returns (realm user id, auth token) def _get_auth_token( username: str, password: str, use_gsa: bool = False, factor_gen: callable = None ) -> tuple[str, str]: if use_gsa: g = gsa.authenticate(username, password, gsa.Anisette()) pet = g["t"]["com.apple.gs.idms.pet"]["token"] else: # Make the request without the 2FA code to make the prompt appear _auth_token_request(username, password) # Now make the request with the 2FA code if factor_gen is None: pet = password + input("Enter 2FA code: ") else: pet = password + factor_gen() r = _auth_token_request(username, pet) # print(r) if "description" in r: raise Exception(f"Error: {r['description']}") service_data = r["delegates"]["com.apple.private.ids"]["service-data"] realm_user_id = service_data["realm-user-id"] auth_token = service_data["auth-token"] # print(f"Auth token for {realm_user_id}: {auth_token}") return realm_user_id, auth_token def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: csr = ( x509.CertificateSigningRequestBuilder() .subject_name( x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), ] ) ) .sign(private_key, hashes.SHA256()) ) csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") return ( csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") .replace("-----END CERTIFICATE REQUEST-----", "") .replace("\n", "") ) # Gets an IDS auth cert for the given user id and auth token # Returns [private key PEM, certificate PEM] def _get_auth_cert(user_id, token) -> tuple[str, str]: private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) body = { "authentication-data": {"auth-token": token}, "csr": b64decode(_generate_csr(private_key)), "realm-user-id": user_id, } body = plistlib.dumps(body) r = requests.post( "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", data=body, headers={"x-protocol-version": "1630"}, verify=False, ) r = plistlib.loads(r.content) if r["status"] != 0: raise (Exception(f"Failed to get auth cert: {r}")) cert = x509.load_der_x509_certificate(r["cert"]) return ( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) .decode("utf-8") .strip(), cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), ) def _register_request( push_token, info, auth_cert, auth_key, push_cert, push_key, validation_data ): body = { "hardware-version": "MacBookPro18,3", "language": "en-US", "os-version": "macOS,13.2.1,22D68", "software-version": "22D68", "services": [ { "capabilities": [{"flags": 1, "name": "Messenger", "version": 1}], "service": "com.apple.madrid", "users": [ { "uris": [{"uri": info["uri"]}], "user-id": info["user_id"], } ], } ], "validation-data": b64decode(validation_data), } body = plistlib.dumps(body) body = gzip.compress(body, mtime=0) push_sig, push_nonce = sign_payload(push_key, "id-register", "", push_token, body) auth_sig, auth_nonce = sign_payload(auth_key, "id-register", "", push_token, body) headers = { "x-protocol-version": "1640", "content-type": "application/x-apple-plist", "content-encoding": "gzip", "x-auth-sig-0": auth_sig, "x-auth-cert-0": auth_cert.replace("\n", "") .replace("-----BEGIN CERTIFICATE-----", "") .replace("-----END CERTIFICATE-----", ""), "x-auth-user-id-0": info["user_id"], "x-auth-nonce-0": b64encode(auth_nonce), "x-pr-nonce": b64encode(auth_nonce), "x-push-token": push_token, "x-push-sig": push_sig, "x-push-cert": push_cert.replace("\n", "") .replace("-----BEGIN CERTIFICATE-----", "") .replace("-----END CERTIFICATE-----", ""), "x-push-nonce": b64encode(push_nonce), } r = requests.post( "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", headers=headers, data=body, verify=False, ) r = plistlib.loads(r.content) print(f'Response code: {r["status"]}') if "status" in r and r["status"] == 6004: raise Exception("Validation data expired!") return r