From daf8868bb5c10fba6b3001901349a6f2e0317d7f Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 11 Apr 2023 12:17:53 -0400 Subject: [PATCH] refactor demo --- demo.py | 9 ++-- ids.py | 148 ++++++++++++++++++++++++++++++-------------------------- 2 files changed, 85 insertions(+), 72 deletions(-) diff --git a/demo.py b/demo.py index a12d12e..3237072 100644 --- a/demo.py +++ b/demo.py @@ -1,7 +1,10 @@ -import apns +import plistlib +import zlib from base64 import b64decode, b64encode from hashlib import sha1 -import plistlib, zlib + +import apns +import ids conn1 = apns.APNSConnection() conn1.connect() @@ -11,4 +14,4 @@ conn1.filter([]) conn1.connect(False) conn1.filter(["com.apple.madrid"]) -# See ids.py for something useful \ No newline at end of file +print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"])) diff --git a/ids.py b/ids.py index a6fe45e..5de670c 100644 --- a/ids.py +++ b/ids.py @@ -1,10 +1,14 @@ -import requests import plistlib -from base64 import b64encode, b64decode -from datetime import datetime import random -from hashlib import sha1 import zlib +from base64 import b64decode, b64encode +from datetime import datetime + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding + +import apns USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" # NOTE: The push token MUST be registered with the account for self-uri! @@ -12,7 +16,6 @@ USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" PUSH_TOKEN = "5V7AY+ikHr4DiSfq1W2UBa71G3FLGkpUSKTrOLg81yk=" SELF_URI = "mailto:jjtech@jjtech.dev" -TO_LOOKUP = ['mailto:jjtech@jjtech.dev'] # Nonce Format: # 01000001876bd0a2c0e571093967fce3d7 @@ -20,7 +23,12 @@ TO_LOOKUP = ['mailto:jjtech@jjtech.dev'] # 000001876d008cc5 # unix time # r1r2r3r4r5r6r7r8 # random bytes def generate_nonce() -> bytes: - return b"\x01" + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + random.randbytes(8) + return ( + b"\x01" + + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + + random.randbytes(8) + ) + def load_keys() -> tuple[str, str]: # Load the private key and certificate from files @@ -31,23 +39,35 @@ def load_keys() -> tuple[str, str]: return ids_key, ids_cert -def _create_payload(bag_key: str, query_string: str, push_token: str, payload: bytes) -> tuple[str, bytes]: + +def _create_payload( + bag_key: str, query_string: str, push_token: str, payload: bytes +) -> tuple[str, bytes]: # Generate the nonce nonce = generate_nonce() push_token = b64decode(push_token) - return nonce + len(bag_key).to_bytes(4) + bag_key.encode() + len(query_string).to_bytes(4) + query_string.encode() + len(payload).to_bytes(4) + payload + len(push_token).to_bytes(4) + push_token, nonce + return ( + nonce + + len(bag_key).to_bytes(4) + + bag_key.encode() + + len(query_string).to_bytes(4) + + query_string.encode() + + len(payload).to_bytes(4) + + payload + + len(push_token).to_bytes(4) + + push_token, + nonce, + ) -def sign_payload(private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes) -> tuple[str, bytes]: - from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric import padding - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives.asymmetric import utils - +def sign_payload( + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes +) -> tuple[str, bytes]: # Load the private key - key = serialization.load_pem_private_key(private_key.encode(), password=None, backend=default_backend()) + key = serialization.load_pem_private_key( + private_key.encode(), password=None, backend=default_backend() + ) payload, nonce = _create_payload(bag_key, query_string, push_token, payload) sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) @@ -57,70 +77,60 @@ def sign_payload(private_key: str, bag_key: str, query_string: str, push_token: return sig, nonce -body = {'uris': TO_LOOKUP} -body = plistlib.dumps(body) -body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS) -key, cert = load_keys() -signature, nonce = sign_payload(key, 'id-query', '', PUSH_TOKEN, body) - -headers = { - 'x-id-cert': cert.replace("-----BEGIN CERTIFICATE-----", "").replace("-----END CERTIFICATE-----", "").replace("\n", ""), - 'x-id-nonce': b64encode(nonce).decode(), - 'x-id-sig': signature, - 'x-push-token': PUSH_TOKEN, - 'x-id-self-uri': SELF_URI, - 'User-Agent': USER_AGENT, - 'x-protocol-version': '1630', +BAG_KEYS = { + "id-query": "https://query.ess.apple.com/WebObjects/QueryService.woa/wa/query" } -# We have to send it over APNs -import apns +global_key, global_cert = load_keys() -conn1 = apns.APNSConnection() -conn1.connect() -conn1.keep_alive() -conn1.set_state(0x01) -conn1.filter([]) -conn1.connect(False) -conn1.filter(["com.apple.madrid"]) -print(conn1.token) -to_send = {'cT': 'application/x-apple-plist', - 'U': b'\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f', # Just random bytes? - 'c': 96, - 'ua': USER_AGENT, - 'u': 'https://query.ess.apple.com/WebObjects/QueryService.woa/wa/query', - 'h': headers, - 'v': 2, # breaks lookup - 'b': body -} +def _send_request(conn: apns.APNSConnection, type: str, body: bytes) -> bytes: + body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS) -conn1.send_message("com.apple.madrid", plistlib.dumps(to_send, fmt=plistlib.FMT_BINARY)) + # Sign the request + signature, nonce = sign_payload(global_key, type, "", PUSH_TOKEN, body) -response = conn1.wait_for_packet(0x0a) + print(signature) -response = apns._get_field(response[1], 3) + headers = { + "x-id-cert": global_cert.replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + .replace("\n", ""), + "x-id-nonce": b64encode(nonce).decode(), + "x-id-sig": signature, + "x-push-token": PUSH_TOKEN, + "x-id-self-uri": SELF_URI, + "User-Agent": USER_AGENT, + "x-protocol-version": "1630", + } -response = plistlib.loads(response) + req = { + "cT": "application/x-apple-plist", + "U": b"\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes? + "c": 96, + "ua": USER_AGENT, + "u": BAG_KEYS[type], + "h": headers, + "v": 2, + "b": body, + } -print(f"Status code: {response['hs']}") + conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) + resp = conn.wait_for_packet(0x0A) -body = response['b'] -body = zlib.decompress(body, 16 + zlib.MAX_WBITS) -body = plistlib.loads(body) + resp2 = apns._get_field(resp[1], 3) -# Recurse over the entire body, replacing all bytes with base64 encoded strings -def recurse(obj): - if isinstance(obj, bytes): - return b64encode(obj).decode() - elif isinstance(obj, dict): - return {k: recurse(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [recurse(v) for v in obj] - return obj + if resp2 is None: + print(f"Got invalid response: {resp}") -body = recurse(body) + return resp2 -import json -print(json.dumps(body, indent=4)) \ No newline at end of file + +def lookup(conn: apns.APNSConnection, query: list[str]) -> any: + query = {"uris": query} + resp = _send_request(conn, "id-query", plistlib.dumps(query)) + resp = plistlib.loads(resp) + resp = zlib.decompress(resp["b"], 16 + zlib.MAX_WBITS) + resp = plistlib.loads(resp) + return resp