refactor demo

This commit is contained in:
JJTech0130 2023-04-11 12:17:53 -04:00
parent cb15a3a6b1
commit daf8868bb5
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
2 changed files with 85 additions and 72 deletions

View file

@ -1,7 +1,10 @@
import apns
import plistlib
import zlib
from base64 import b64decode, b64encode
from hashlib import sha1
import plistlib, zlib
import apns
import ids
conn1 = apns.APNSConnection()
conn1.connect()
@ -11,4 +14,4 @@ conn1.filter([])
conn1.connect(False)
conn1.filter(["com.apple.madrid"])
# See ids.py for something useful
print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"]))

150
ids.py
View file

@ -1,10 +1,14 @@
import requests
import plistlib
from base64 import b64encode, b64decode
from datetime import datetime
import random
from hashlib import sha1
import zlib
from base64 import b64decode, b64encode
from datetime import datetime
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import apns
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
# NOTE: The push token MUST be registered with the account for self-uri!
@ -12,7 +16,6 @@ USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
PUSH_TOKEN = "5V7AY+ikHr4DiSfq1W2UBa71G3FLGkpUSKTrOLg81yk="
SELF_URI = "mailto:jjtech@jjtech.dev"
TO_LOOKUP = ['mailto:jjtech@jjtech.dev']
# Nonce Format:
# 01000001876bd0a2c0e571093967fce3d7
@ -20,7 +23,12 @@ TO_LOOKUP = ['mailto:jjtech@jjtech.dev']
# 000001876d008cc5 # unix time
# r1r2r3r4r5r6r7r8 # random bytes
def generate_nonce() -> bytes:
return b"\x01" + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + random.randbytes(8)
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
def load_keys() -> tuple[str, str]:
# Load the private key and certificate from files
@ -31,23 +39,35 @@ def load_keys() -> tuple[str, str]:
return ids_key, ids_cert
def _create_payload(bag_key: str, query_string: str, push_token: str, payload: bytes) -> tuple[str, bytes]:
def _create_payload(
bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Generate the nonce
nonce = generate_nonce()
push_token = b64decode(push_token)
return nonce + len(bag_key).to_bytes(4) + bag_key.encode() + len(query_string).to_bytes(4) + query_string.encode() + len(payload).to_bytes(4) + payload + len(push_token).to_bytes(4) + push_token, nonce
return (
nonce
+ len(bag_key).to_bytes(4)
+ bag_key.encode()
+ len(query_string).to_bytes(4)
+ query_string.encode()
+ len(payload).to_bytes(4)
+ payload
+ len(push_token).to_bytes(4)
+ push_token,
nonce,
)
def sign_payload(private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes) -> tuple[str, bytes]:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import utils
def sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(private_key.encode(), password=None, backend=default_backend())
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
@ -57,70 +77,60 @@ def sign_payload(private_key: str, bag_key: str, query_string: str, push_token:
return sig, nonce
body = {'uris': TO_LOOKUP}
body = plistlib.dumps(body)
BAG_KEYS = {
"id-query": "https://query.ess.apple.com/WebObjects/QueryService.woa/wa/query"
}
global_key, global_cert = load_keys()
def _send_request(conn: apns.APNSConnection, type: str, body: bytes) -> bytes:
body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS)
key, cert = load_keys()
signature, nonce = sign_payload(key, 'id-query', '', PUSH_TOKEN, body)
# Sign the request
signature, nonce = sign_payload(global_key, type, "", PUSH_TOKEN, body)
print(signature)
headers = {
'x-id-cert': cert.replace("-----BEGIN CERTIFICATE-----", "").replace("-----END CERTIFICATE-----", "").replace("\n", ""),
'x-id-nonce': b64encode(nonce).decode(),
'x-id-sig': signature,
'x-push-token': PUSH_TOKEN,
'x-id-self-uri': SELF_URI,
'User-Agent': USER_AGENT,
'x-protocol-version': '1630',
"x-id-cert": global_cert.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replace("\n", ""),
"x-id-nonce": b64encode(nonce).decode(),
"x-id-sig": signature,
"x-push-token": PUSH_TOKEN,
"x-id-self-uri": SELF_URI,
"User-Agent": USER_AGENT,
"x-protocol-version": "1630",
}
# We have to send it over APNs
import apns
conn1 = apns.APNSConnection()
conn1.connect()
conn1.keep_alive()
conn1.set_state(0x01)
conn1.filter([])
conn1.connect(False)
conn1.filter(["com.apple.madrid"])
print(conn1.token)
to_send = {'cT': 'application/x-apple-plist',
'U': b'\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f', # Just random bytes?
'c': 96,
'ua': USER_AGENT,
'u': 'https://query.ess.apple.com/WebObjects/QueryService.woa/wa/query',
'h': headers,
'v': 2, # breaks lookup
'b': body
req = {
"cT": "application/x-apple-plist",
"U": b"\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes?
"c": 96,
"ua": USER_AGENT,
"u": BAG_KEYS[type],
"h": headers,
"v": 2,
"b": body,
}
conn1.send_message("com.apple.madrid", plistlib.dumps(to_send, fmt=plistlib.FMT_BINARY))
conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
resp = conn.wait_for_packet(0x0A)
response = conn1.wait_for_packet(0x0a)
resp2 = apns._get_field(resp[1], 3)
response = apns._get_field(response[1], 3)
if resp2 is None:
print(f"Got invalid response: {resp}")
response = plistlib.loads(response)
return resp2
print(f"Status code: {response['hs']}")
body = response['b']
body = zlib.decompress(body, 16 + zlib.MAX_WBITS)
body = plistlib.loads(body)
# Recurse over the entire body, replacing all bytes with base64 encoded strings
def recurse(obj):
if isinstance(obj, bytes):
return b64encode(obj).decode()
elif isinstance(obj, dict):
return {k: recurse(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recurse(v) for v in obj]
return obj
body = recurse(body)
import json
print(json.dumps(body, indent=4))
def lookup(conn: apns.APNSConnection, query: list[str]) -> any:
query = {"uris": query}
resp = _send_request(conn, "id-query", plistlib.dumps(query))
resp = plistlib.loads(resp)
resp = zlib.decompress(resp["b"], 16 + zlib.MAX_WBITS)
resp = plistlib.loads(resp)
return resp