yay refactoring that works

This commit is contained in:
JJTech0130 2023-05-09 19:29:17 -04:00
parent 7acc1da5b8
commit 7aa42d2b14
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
5 changed files with 19 additions and 674 deletions

View file

@ -68,5 +68,5 @@ class IDSUser:
self._id_keypair = id_keypair self._id_keypair = id_keypair
def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any: def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any:
return query.lookup(self.push_connection, self.handles[0], self._id_keypair, topic, uris) return query.lookup(self.push_connection, self.handles[0], self._id_keypair, uris, topic)

View file

@ -10,30 +10,27 @@ from ._helpers import USER_AGENT, KeyPair
from . import signing from . import signing
def _send_request( def lookup(
conn: apns.APNSConnection, conn: apns.APNSConnection,
bag_key: str,
topic: str,
body: bytes,
keypair: KeyPair,
self_uri: str, self_uri: str,
id_keypair: KeyPair,
query: list[str],
topic,
) -> bytes: ) -> bytes:
BAG_KEY = "id-query"
conn.filter([topic])
body = {"uris": query}
body = plistlib.dumps(body)
body = gzip.compress(body, mtime=0) body = gzip.compress(body, mtime=0)
push_token = b64encode(conn.token).decode() push_token = b64encode(conn.token).decode()
# Sign the request
# signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
headers = { headers = {
"x-id-self-uri": self_uri, "x-id-self-uri": self_uri,
"User-Agent": USER_AGENT,
"x-protocol-version": "1630", "x-protocol-version": "1630",
} }
print(headers) signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token, None)
signing.add_id_signature(headers, body, bag_key, keypair, push_token)
# print(headers)
msg_id = random.randbytes(16) msg_id = random.randbytes(16)
@ -41,16 +38,14 @@ def _send_request(
"cT": "application/x-apple-plist", "cT": "application/x-apple-plist",
"U": msg_id, "U": msg_id,
"c": 96, "c": 96,
"ua": USER_AGENT, "u": bags.ids_bag()[BAG_KEY],
"u": bags.ids_bag()[bag_key],
"h": headers, "h": headers,
"v": 2, "v": 2,
"b": body, "b": body,
} }
print(req) print(req)
conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
# resp = conn.wait_for_packet(0x0A)
def check_response(x): def check_response(x):
if x[0] != 0x0A: if x[0] != 0x0A:
@ -62,33 +57,9 @@ def _send_request(
return resp_body["U"] == msg_id return resp_body["U"] == msg_id
# Lambda to check if the response is the one we want # Lambda to check if the response is the one we want
# conn.incoming_queue.find(check_response)
payload = conn.incoming_queue.wait_pop_find(check_response) payload = conn.incoming_queue.wait_pop_find(check_response)
# conn._send_ack(apns._get_field(payload[1], 4))
resp = apns._get_field(payload[1], 3) resp = apns._get_field(payload[1], 3)
return plistlib.loads(resp) resp = plistlib.loads(resp)
# Performs an IDS lookup
# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic
# self: the user's email address
# keypair: a KeyPair object containing the user's private key and certificate
# topic: the IDS topic to query
# query: a list of URIs to query
def lookup_n(
conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str]
) -> any:
conn.filter([topic])
query = {"uris": query}
resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), id_keypair, self_uri)
# resp = plistlib.loads(resp)
# print(resp)
resp = gzip.decompress(resp["b"]) resp = gzip.decompress(resp["b"])
resp = plistlib.loads(resp) resp = plistlib.loads(resp)
return resp return resp
def lookup(conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str]) -> any:
import oldids
#id_keypair = KeyPair("-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEA6fNjLPobeiQEbeDzYResvK2oC9+MsGyog36jo1o7pm8AeIth\nSzZ7caM8ThM/37i9DGJyDsnl6yqg1SxhyW4Fm8Evq2Mm6eYh6YwzRvppQoFqXNQO\nrEgjpQAW+D31V5OvHRprwX6qDVRprNF8gtaGYYjTbQudzYwpzpCIwbUu+1IqfojF\ngzTR/bxfdTbJnlaqWbOjFF7WrSZrP18nGaVkbM3rBS7egRZH1WlG14gO31YNbNg3\ndzOz9hQJHehHfHrSyZam3h6nda8tA7LJzVpTGCo7PJMC4IVyvQf7S2N3BlMJ4cen\nemzaDIOW9b/FCuvENkY2LPuDIT1hQs3pOoWHSQIDAQABAoIBABPAmCLDwh6jnFkf\nmUTdEBlFCy9ncjQyFF83yb6gv3ELpa1HzVDhmnYLe2u3Hdk4eoOpaypa+wXKLVaa\nPu5YEvKl0q3EexRb+QiELQ8k1M7H6PBJ+iwrEhFcCtRuPMDmZ+5L3QWy+U4TTrHH\n5RyR2row6HLoPGxOlXgKhXVfZAZVbgsSG8dbbuoP+U9cCrSU5TH2yIa64Gm7XrvF\n0aEo+J6nMAzw4jUUYY/y8gCU89p5utNDpxXZNva8CO0GpkooZ0nDAOnUjytNpWow\nEXkta9xKBwPQ9FXk4tK0005U6s9lFbKm4HdeypX/teSmhaS3QshENL/zmMysOEpN\nxaIRPMECgYEA8O+h5POMutVufwnonzAq0yWxSDtx4Sj9kUNNflLV0T941TMKIZwp\nQmpBDgbt3ffATjRAdKwHEncHXWhPIf3oA0UgqZFdUEEboIXlNd+6unegGHfrrT/S\n5sOQgN9kyZ/z1IvRVxA9qj3shSFFw4p0gOShObc2NGCmJI7IXc6PumECgYEA+JPz\nCl0l0RCk+lL59YUOe9irhqwHeWo26vsPbnWn8mjN6RB6ZF3NeRFU8KaMf9Zb0eO7\nGnSku97AEgL/UkP1F9imrRI1Ci3jT/vGHyFpR0g8KfhAwZuBZBPavaZ52nW5tiDz\nILzxHJfg8xHXKPGl3T5r7ZzuIxmDPY7bFk6xBekCgYEAwviIQCg+l+qjcjZognmO\nDjQQVG2WaCitmWGnUjRiRuRgOdcFudEPKmmln15IGzmj6yUpi8CyMGUWFqaUcuNv\nX0YPemjh5FHrs2jm5UPZbY/khCh3FUnytz9GrqMYgnjn7fX/P78qx5s4zTrxo51l\nTfC172itepFDoY3R4ueHM8ECgYEAm3MqUhjeRVe7VC//0OJcpGZjHd0G747UuS44\nAEPju1x/KHj9kTZ4AHYuQDBnPKq40RExOOIpArPSOXFWagPFihwaX7E7Khp4RNSW\nmXEzfThXJ4fwNyMgT417BY7ONSfZ82O3p4mA3vi73EYT367+otUeeYHiCmEyCZUE\nvXaIjcECgYEAwYaoKAW8+dpUI8e40jg1FE4eWKo9HC/Gnn2rf0bTMz1qgtH6T9Fj\nvfcM9C8RM0ziXrU255fqqWGBNI3z8dq0mgH/CmU87vV4ldqd6Ej+37EC1drAtX4C\nxPIafLpiKa2aDPcw4FAG+nOGEfYIPbS9WT1Jmz/Qw3EUbNKtt6Ze1Ps=\n-----END RSA PRIVATE KEY-----", "-----BEGIN CERTIFICATE-----\nMIIHOTCCBiGgAwIBAgIQGaPYy+62Ee0Sd7oaf5gYAzANBgkqhkiG9w0BAQUFADBu\nMQswCQYDVQQGEwJVUzETMBEGA1UECgwKQXBwbGUgSW5jLjESMBAGA1UECwwJQXBw\nbGUgSURTMREwDwYDVQQPDAhpZGVudGl0eTEjMCEGA1UEAwwaQXBwbGUgSURTIElk\nZW50aXR5IENBIC0gUjEwHhcNMjMwNTA5MjIwODIzWhcNMjMwNjIzMjIwODU5WjCB\n2zELMAkGA1UEBhMCVVMxEzARBgNVBAoMCkFwcGxlIEluYy4xEjAQBgNVBAsMCU1l\nc3NlbmdlcjEOMAwGA1UEDwwFZHMtaWQxHTAbBgoJkiaJk/IsZAEBDA1EOjIwOTk0\nMzYwOTcxMXQwcgYDVQQFE2tiOjdDMDc4MjI2OTdGRDdGRTA5NDhGN0YzODAxOTJC\nNjZDQTkwNDJBNjEvQTI3QzFCMEM5MjE3OUFBQzk5N0U1Mzc0NUM5Q0JDNEZFMzhB\nNDFEMkQ4OUZFNkNCMzg1MThDREJDMTUwODZCNDCCASIwDQYJKoZIhvcNAQEBBQAD\nggEPADCCAQoCggEBAOnzYyz6G3okBG3g82EXrLytqAvfjLBsqIN+o6NaO6ZvAHiL\nYUs2e3GjPE4TP9+4vQxicg7J5esqoNUsYcluBZvBL6tjJunmIemMM0b6aUKBalzU\nDqxII6UAFvg99VeTrx0aa8F+qg1UaazRfILWhmGI020Lnc2MKc6QiMG1LvtSKn6I\nxYM00f28X3U2yZ5WqlmzoxRe1q0maz9fJxmlZGzN6wUu3oEWR9VpRteIDt9WDWzY\nN3czs/YUCR3oR3x60smWpt4ep3WvLQOyyc1aUxgqOzyTAuCFcr0H+0tjdwZTCeHH\np3ps2gyDlvW/xQrrxDZGNiz7gyE9YULN6TqFh0kCAwEAAaOCA2MwggNfMIIC2wYD\nVR0RBIIC0jCCAs6GHG1haWx0bzp1c2VyX3Rlc3QyQGljbG91ZC5jb22gLAYKKoZI\nhvdjZAYEBAMeAAMAAAACAAAABAAAAAEAAAABAAAAAAAABmgAAAAAoIICLAYKKoZI\nhvdjZAYEBwOCAhwARlVTUACPC3uexqw0O0//dpYLdkkocIFg/GhUJg5qX2F8IJ0Y\nqjx0LiR6qlqFCf1UHqVlqU3LtnTQnYYqG0kNje/DC9C2jC1J5+SGzit94eDfVM63\nUH+UpZQHX1J7NT2xjKQxjbvC9jnWHZMxTBvmwSZqHrrzql+rL840stJpopg335DQ\nsjUig9JgHwVYrxBUHGFMDFONZ4swNbjcOGKFT1KH1VaLAxFNrnL8U7m2h0PSG9Ur\nTXUrQFmLEOl5Jul2LAe0n84WAEwt/u3aZGY9SwQaHFz+64P7gWZpjC/q0ZvjbWiB\nxLc9L/qHm9282RA6e/ibn9C5a944GjNrmTy3FKEc7oL3Ru2XBZ5hlyAVBdTqgg8/\nLmT9SizbZ63Rt5Pct4slButdbecCq7phR46ATpgWLYjOx6NVw68G3cuC3hmXkTVW\ndVcJcikXC4c02YBiNqA2svViz32+QvCzQxvHEajC6+xOXEfFwq58S8/c+7HXJEIx\nnovNGWrcbzpCvSH1GankT1WjG5cQBPvUwnOQ58yvcma1FlQ7NU7JMDgPYqDUhhwZ\nhG9V+LRcGIGzLK9hsZQ39SQjAVqYJ23YPvNl3leaGJaiNgTgjccH6htTI5BBhDdM\nlBUooNEEmbrl0S6NB+OwI/5fWtic2T17J5HEM5mT3u9yC3reurv21hcG/R3rO04N\ni287i7848P039m0/cS0MFiOElmzAQgWgHwYKKoZIhvdjZAYECAMRABmj2MvuthHt\nEne6Gn+YGAOgLwYKKoZIhvdjZAYEBgMhAKJ8GwySF5qsmX5TdFycvE/jikHS2J/m\nyzhRjNvBUIa0MB8GA1UdIwQYMBaAFMZ7ab5JwEEOwMirMjI45D+RQIvaMB0GA1Ud\nDgQWBBSktWY28tqk62vLZOqMfXbkDszx6zAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB\n/wQEAwID+DAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDQYJKoZI\nhvcNAQEFBQADggEBAApIEISe9G6kdCwcWphSpiN1yGUP9WVhJTgUTvUgWl/e7Z1q\n4uVGNb2LsBEHXvI/rGL3qqVqSlt8b+GKqqxCSuL2G5ROoYn9wL/BuNCQJaa/SMqW\nA0Gz3uIA+fd/G+iYH31SP62DH/o6u7ctdG+pi5gjSCiBQcc8jTuOvWhSea6SfVC3\nqW7BBaxTSal/RWNll7A3RBCZS9vK7FZihDomGGH37YDNONTTr41k6FIH65X3pzy0\nFk5Jn/N/Ymhy5zcNPG1TBoXX2ZRWvfxuqMYP3+lfL15STJGQ65fnQNSSS6GkCGVm\nn3R7QDyy73xSTtEiBg28PUw/s2t+OR4lFuQr+KI=\n-----END CERTIFICATE-----")
return oldids.lookup(conn, self_uri, id_keypair, topic, query)

View file

@ -70,14 +70,14 @@ def _create_payload(
# Returns signature, nonce # Returns signature, nonce
def _sign_payload( def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes, nonce = None
) -> tuple[str, bytes]: ) -> tuple[str, bytes]:
# Load the private key # Load the private key
key = serialization.load_pem_private_key( key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend() private_key.encode(), password=None, backend=default_backend()
) )
payload, nonce = _create_payload(bag_key, query_string, push_token, payload) payload, nonce = _create_payload(bag_key, query_string, push_token, payload, nonce)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore
@ -116,9 +116,10 @@ def add_id_signature(
bag_key: str, bag_key: str,
id_key: KeyPair, id_key: KeyPair,
push_token: str, push_token: str,
nonce=None,
): ):
id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body, nonce)
headers["x-id-sig"] = id_sig headers["x-id-sig"] = id_sig
headers["x-id-nonce"] = b64encode(id_nonce) headers["x-id-nonce"] = b64encode(id_nonce).decode()
headers["x-id-cert"] = dearmour(id_key.cert) headers["x-id-cert"] = dearmour(id_key.cert)
headers["x-push-token"] = push_token headers["x-push-token"] = push_token

View file

@ -1,191 +0,0 @@
import getpass
import json
import oldids as ids
from oldids import *
#from base64 import b64encode, b64decode
# Open config
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
def input_multiline(prompt):
print(prompt)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines)
def refresh_token():
# If no username is set, prompt for it
if "username" not in CONFIG:
CONFIG["username"] = input("Enter iCloud username: ")
# If no password is set, prompt for it
if "password" not in CONFIG:
CONFIG["password"] = getpass.getpass("Enter iCloud password: ")
# If grandslam authentication is not set, prompt for it
if "use_gsa" not in CONFIG:
CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y"
def factor_gen():
return input("Enter iCloud 2FA code: ")
CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token(
CONFIG["username"], CONFIG["password"], factor_gen=factor_gen
)
def refresh_cert():
CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert(
CONFIG["user_id"], CONFIG["token"]
)
def create_connection():
conn = apns.APNSConnection()
token = conn.connect()
# conn.filter(['com.apple.madrid'])
CONFIG["push"] = {
"token": b64encode(token).decode(),
"cert": conn.cert,
"key": conn.private_key,
}
return conn
def restore_connection():
conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"])
conn.connect(True, b64decode(CONFIG["push"]["token"]))
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi'])
return conn
def refresh_ids_cert():
info = {
"uri": "mailto:" + CONFIG["username"],
"user_id": CONFIG["user_id"],
}
print(
ids._get_handles(
CONFIG["push"]["token"],
CONFIG["user_id"],
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
)
)
resp = None
try:
if "validation_data" in CONFIG:
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
CONFIG["validation_data"],
)
except Exception as e:
print(e)
resp = None
if resp is None:
print(
"Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy."
)
validation_data = (
input_multiline("Enter validation data: ")
.replace("\n", "")
.replace(" ", "")
)
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
validation_data,
)
CONFIG["validation_data"] = validation_data
print(resp)
ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"])
ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
CONFIG["ids_cert"] = ids_cert
if not "push" in CONFIG:
print("No existing APNs credentials, creating new ones...")
# print("No push conn")
conn = create_connection()
else:
print("Restoring APNs credentials...")
conn = restore_connection()
print("Connected to APNs!")
if not "ids_cert" in CONFIG:
print("No existing IDS certificate, creating new one...")
if not "key" in CONFIG:
print("No existing authentication certificate, creating new one...")
if not "token" in CONFIG:
print("No existing authentication token, creating new one...")
refresh_token()
print("Got authentication token!")
refresh_cert()
print("Got authentication certificate!")
refresh_ids_cert()
print("Got IDS certificate!")
ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"])
def lookup(topic: str, users: list[str]):
print(f"Looking up users {users} for topic {topic}...")
resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users)
print(resp)
# r = list(resp['results'].values())[0]
for k, v in resp["results"].items():
print(f"Result for user {k} topic {topic}:")
i = v["identities"]
print(f"IDENTITIES: {len(i)}")
for iden in i:
print("IDENTITY", end=" ")
print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ")
if "client-data" in iden:
print(f"Client Data: {len(iden['client-data'])}")
else:
print("No client data")
# Hack to make sure that the requests and responses match up
# This filter MUST contain all the topics you are looking up
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing'])
# import time
# print("...waiting for queued messages... (this is a hack)")
# time.sleep(5) # Let the server send us any messages it was holding
# conn.sink() # Dump the messages
lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"])
lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"])
# time.sleep(4)
# Save config
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)

436
oldids.py
View file

@ -1,436 +0,0 @@
import gzip
import plistlib
import random
import uuid
from base64 import b64decode, b64encode
from collections import namedtuple
from datetime import datetime
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
import apns
import bags
import gsa
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
PROTOCOL_VERSION = "1640"
KeyPair = namedtuple("KeyPair", ["key", "cert"])
# global_key, global_cert = load_keys()
def _send_request(
conn: apns.APNSConnection,
bag_key: str,
topic: str,
body: bytes,
keypair: KeyPair,
username: str,
) -> bytes:
#print(body)
print(bag_key, topic, body, keypair, username)
body = gzip.compress(body, mtime=0)
push_token = b64encode(conn.token).decode()
# Sign the request
signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
headers = {
"x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replace("\n", ""),
"x-id-nonce": b64encode(nonce).decode(),
"x-id-sig": signature,
"x-push-token": push_token,
"x-id-self-uri": "mailto:user_test2@icloud.com",
"User-Agent": USER_AGENT,
"x-protocol-version": "1640",
}
# print(headers)
msg_id = random.randbytes(16)
req = {
"cT": "application/x-apple-plist",
"U": msg_id,
"c": 96,
"ua": USER_AGENT,
"u": bags.ids_bag()[bag_key],
"h": headers,
"v": 2,
"b": body,
}
print(req)
conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
# resp = conn.wait_for_packet(0x0A)
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
return resp_body["U"] == msg_id
# Lambda to check if the response is the one we want
# conn.incoming_queue.find(check_response)
payload = conn.incoming_queue.wait_pop_find(check_response)
# conn._send_ack(apns._get_field(payload[1], 4))
resp = apns._get_field(payload[1], 3)
return plistlib.loads(resp)
# Performs an IDS lookup
# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic
# self: the user's email address
# keypair: a KeyPair object containing the user's private key and certificate
# topic: the IDS topic to query
# query: a list of URIs to query
def lookup(
conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str]
) -> any:
conn.filter(["com.apple.madrid"])
query = {"uris": query}
resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self)
# resp = plistlib.loads(resp)
# print(resp)
resp = gzip.decompress(resp["b"])
resp = plistlib.loads(resp)
return resp
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
data = {
"apple-id": username,
"client-id": str(uuid.uuid4()),
"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"password": password,
}
data = plistlib.dumps(data)
r = requests.post(
"https://setup.icloud.com/setup/prefpane/loginDelegates",
auth=(username, password),
data=data,
verify=False,
)
r = plistlib.loads(r.content)
return r
# Gets an IDS auth token for the given username and password
# Will use native Grand Slam on macOS
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
# Returns (realm user id, auth token)
def _get_auth_token(
username: str, password: str, factor_gen: callable = None
) -> tuple[str, str]:
from sys import platform
# if use_gsa:
if platform == "darwin":
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
# Now make the request with the 2FA code
if factor_gen is None:
pet = password + input("Enter 2FA code: ")
else:
pet = password + factor_gen()
r = _auth_token_request(username, pet)
# print(r)
if "description" in r:
raise Exception(f"Error: {r['description']}")
service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
return realm_user_id, auth_token
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
]
)
)
.sign(private_key, hashes.SHA256())
)
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
return (
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
.replace("-----END CERTIFICATE REQUEST-----", "")
.replace("\n", "")
)
# Gets an IDS auth cert for the given user id and auth token
# Returns [private key PEM, certificate PEM]
def _get_auth_cert(user_id, token) -> KeyPair:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
body = {
"authentication-data": {"auth-token": token},
"csr": b64decode(_generate_csr(private_key)),
"realm-user-id": user_id,
}
body = plistlib.dumps(body)
r = requests.post(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
data=body,
headers={"x-protocol-version": "1630"},
verify=False,
)
r = plistlib.loads(r.content)
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip(),
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
)
def _register_request(
push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data
):
body = {
"hardware-version": "MacBookPro18,3",
"language": "en-US",
"os-version": "macOS,13.2.1,22D68",
"software-version": "22D68",
"services": [
{
"capabilities": [{"flags": 1, "name": "Messenger", "version": 1}],
"service": "com.apple.madrid",
"users": [
{
# TODO: Pass ALL URIs from get handles
"uris": [{"uri": info["uri"]}],
"user-id": info["user_id"],
}
],
}
],
"validation-data": b64decode(validation_data),
}
body = plistlib.dumps(body)
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id-0": info["user_id"],
}
_add_auth_push_signatures(
headers, body, "id-register", auth_key, push_key, push_token, 0
)
r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
headers=headers,
data=body,
verify=False,
)
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses
return r
def mini_cert(cert: str):
return (
cert.replace("\n", "")
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
)
PROTOCOL_VERSION = "1640"
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id": user_id,
}
_add_auth_push_signatures(
headers, None, "id-get-handles", auth_key, push_key, push_token
)
r = requests.get(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
headers=headers,
verify=False,
)
r = plistlib.loads(r.content)
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
return [handle["uri"] for handle in r["handles"]]
class IDSUser:
def _authenticate_for_token(
self, username: str, password: str, factor_callback: callable = None
):
self.user_id, self._auth_token = _get_auth_token(
username, password, factor_callback
)
def _authenticate_for_cert(self):
self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token)
# Factor callback will be called if a 2FA code is necessary
def __init__(
self,
push_connection: apns.APNSConnection,
username: str,
password: str,
factor_callback: callable = None,
):
self.push_connection = push_connection
self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert()
self.handles = _get_handles(
b64encode(self.push_connection.token),
self.user_id,
self._auth_keypair,
KeyPair(self.push_connection.private_key, self.push_connection.cert),
)
def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
def test():
import getpass
conn = apns.APNSConnection()
conn.connect()
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
user = IDSUser(conn, username, password)
print(user)
# SIGNING STUFF
# Nonce Format:
# 01000001876bd0a2c0e571093967fce3d7
# 01 # version
# 000001876d008cc5 # unix time
# r1r2r3r4r5r6r7r8 # random bytes
def generate_nonce() -> bytes:
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
# Creates a payload from individual parts for signing
def _create_payload(
bag_key: str,
query_string: str,
push_token: str,
payload: bytes,
nonce: bytes = None,
) -> tuple[str, bytes]:
# Generate the nonce
if nonce is None:
nonce = generate_nonce()
print(push_token)
push_token = b64decode(push_token)
if payload is None:
payload = b""
return (
nonce
+ len(bag_key).to_bytes(4, "big")
+ bag_key.encode()
+ len(query_string).to_bytes(4, "big")
+ query_string.encode()
+ len(payload).to_bytes(4, "big")
+ payload
+ len(push_token).to_bytes(4, "big")
+ push_token,
nonce,
)
# Returns signature, nonce
def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
sig = b"\x01\x01" + sig
sig = b64encode(sig).decode()
return sig, nonce
# Add headers for x-push-sig and x-auth-sig stuff
def _add_auth_push_signatures(
headers: dict,
body: bytes,
bag_key: str,
auth_key: KeyPair,
push_key: KeyPair,
push_token: str,
auth_number=None,
):
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
headers["x-push-sig"] = push_sig
headers["x-push-nonce"] = b64encode(push_nonce)
headers["x-push-cert"] = mini_cert(push_key.cert)
headers["x-push-token"] = push_token
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
headers["x-auth-sig" + auth_postfix] = auth_sig
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert)
if __name__ == "__main__":
test()