Merge pull request #2 from JJTech0130/idsrefactor

Majorly refactor IDS
This commit is contained in:
JJTech 2023-05-09 20:02:44 -04:00 committed by GitHub
commit 0f2901a109
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 587 additions and 599 deletions

View file

@ -68,7 +68,7 @@ def generate_push_cert() -> tuple[str, str]:
)
# Sign the activation info
signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1())
signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1()) # type: ignore
body = {
"ActivationInfoComplete": True,
@ -83,7 +83,7 @@ def generate_push_cert() -> tuple[str, str]:
verify=False,
)
protocol = re.search("<Protocol>(.*)</Protocol>", resp.text).group(1)
protocol = re.search("<Protocol>(.*)</Protocol>", resp.text).group(1) # type: ignore
protocol = plistlib.loads(protocol.encode("utf-8"))
return (

View file

@ -9,8 +9,11 @@ from hashlib import sha1
import tlslite
import albert
import bags
COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config
#COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config
# Pick a random courier server from 01 to APNSCourierHostcount
COURIER_HOST = f"{random.randint(1, bags.apns_init_bag()['APNSCourierHostcount'])}-{bags.apns_init_bag()['APNSCourierHostname']}"
COURIER_PORT = 5223
ALPN = [b"apns-security-v2"]

View file

@ -3,7 +3,7 @@ import plistlib
import requests
def apns_init_bag():
def apns_init_bag_old():
r = requests.get("https://init.push.apple.com/bag", verify=False)
if r.status_code != 200:
raise Exception("Failed to get APNs init bag")
@ -15,7 +15,7 @@ def apns_init_bag():
# This is the same as the above, but the response has a signature which we unwrap
def apns_init_bag_2():
def apns_init_bag():
r = requests.get("http://init-p01st.push.apple.com/bag", verify=False)
if r.status_code != 200:
raise Exception("Failed to get APNs init bag 2")

233
demo.py
View file

@ -1,15 +1,9 @@
import getpass
import json
from base64 import b64encode
from getpass import getpass
from base64 import b64decode
import apns
import ids
from ids import *
# Open config
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
def input_multiline(prompt):
@ -23,168 +17,89 @@ def input_multiline(prompt):
return "\n".join(lines)
def refresh_token():
# If no username is set, prompt for it
if "username" not in CONFIG:
CONFIG["username"] = input("Enter iCloud username: ")
# If no password is set, prompt for it
if "password" not in CONFIG:
CONFIG["password"] = getpass.getpass("Enter iCloud password: ")
# If grandslam authentication is not set, prompt for it
if "use_gsa" not in CONFIG:
CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y"
# Try and load config.json
try:
with open("config.json", "r") as f:
def factor_gen():
return input("Enter iCloud 2FA code: ")
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token(
CONFIG["username"], CONFIG["password"], factor_gen=factor_gen
)
def refresh_cert():
CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert(
CONFIG["user_id"], CONFIG["token"]
)
def create_connection():
conn = apns.APNSConnection()
token = conn.connect()
# conn.filter(['com.apple.madrid'])
CONFIG["push"] = {
"token": b64encode(token).decode(),
"cert": conn.cert,
"key": conn.private_key,
def convert_config(old):
new = {}
new["id"] = {
"key": old["key"],
"cert": old["ids_cert"],
}
return conn
def restore_connection():
conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"])
conn.connect(True, b64decode(CONFIG["push"]["token"]))
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi'])
return conn
def refresh_ids_cert():
info = {
"uri": "mailto:" + CONFIG["username"],
"user_id": CONFIG["user_id"],
new["auth"] = {
"key": old["key"],
"cert": old["auth_cert"],
"user_id": old["user_id"],
"handles": [
"mailto:user_test2@icloud.com",
]
#"handles": old["handles"],
}
new["push"] = {
"token": old["push"]["token"],
"key": old["push"]["key"],
"cert": old["push"]["cert"],
}
return new
print(
ids._get_handles(
CONFIG["push"]["token"],
CONFIG["user_id"],
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
)
)
# Uncomment this to change from an old config.json to a new one
#CONFIG = convert_config(CONFIG)
resp = None
conn = apns.APNSConnection(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
)
def safe_b64decode(s):
try:
if "validation_data" in CONFIG:
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
CONFIG["validation_data"],
)
except Exception as e:
print(e)
resp = None
return b64decode(s)
except:
return None
conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token")))
#print(b64encode(conn.token).decode())
user = ids.IDSUser(conn)
if resp is None:
print(
"Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy."
)
validation_data = (
input_multiline("Enter validation data: ")
.replace("\n", "")
.replace(" ", "")
)
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
validation_data,
)
CONFIG["validation_data"] = validation_data
print(resp)
ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"])
ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
CONFIG["ids_cert"] = ids_cert
if not "push" in CONFIG:
print("No existing APNs credentials, creating new ones...")
# print("No push conn")
conn = create_connection()
if CONFIG.get("auth", {}).get("cert") is not None:
auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
user_id = CONFIG["auth"]["user_id"]
handles = CONFIG["auth"]["handles"]
user.restore_authentication(auth_keypair, user_id, handles)
else:
print("Restoring APNs credentials...")
conn = restore_connection()
print("Connected to APNs!")
username = input("Username: ")
password = getpass("Password: ")
if not "ids_cert" in CONFIG:
print("No existing IDS certificate, creating new one...")
if not "key" in CONFIG:
print("No existing authentication certificate, creating new one...")
if not "token" in CONFIG:
print("No existing authentication token, creating new one...")
refresh_token()
print("Got authentication token!")
refresh_cert()
print("Got authentication certificate!")
refresh_ids_cert()
print("Got IDS certificate!")
user.authenticate(username, password)
ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"])
if CONFIG.get("id", {}).get("cert") is not None:
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair)
else:
vd = input_multiline("Enter validation data: ")
user.register(vd)
print(user.lookup(["mailto:textgpt@icloud.com"]))
def lookup(topic: str, users: list[str]):
print(f"Looking up users {users} for topic {topic}...")
resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users)
# Write config.json
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.token).decode(),
"key": user.push_connection.private_key,
"cert": user.push_connection.cert,
}
# print(resp)
# r = list(resp['results'].values())[0]
for k, v in resp["results"].items():
print(f"Result for user {k} topic {topic}:")
i = v["identities"]
print(f"IDENTITIES: {len(i)}")
for iden in i:
print("IDENTITY", end=" ")
print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ")
if "client-data" in iden:
print(f"Client Data: {len(iden['client-data'])}")
else:
print("No client data")
# Hack to make sure that the requests and responses match up
# This filter MUST contain all the topics you are looking up
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing'])
# import time
# print("...waiting for queued messages... (this is a hack)")
# time.sleep(5) # Let the server send us any messages it was holding
# conn.sink() # Dump the messages
lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"])
lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"])
# time.sleep(4)
# Save config
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)

View file

@ -14,4 +14,4 @@ conn1.filter(["com.apple.madrid"])
# print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"]))
print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1"))
#print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1"))

5
gsa.py
View file

@ -326,8 +326,9 @@ def check_error(r):
status = r
if status["ec"] != 0:
print(f"Error {status['ec']}: {status['em']}")
return True
raise Exception(f"Error {status['ec']}: {status['em']}")
#print(f"Error {status['ec']}: {status['em']}")
#return True
return False

432
ids.py
View file

@ -1,432 +0,0 @@
import gzip
import plistlib
import random
import uuid
from base64 import b64decode, b64encode
from collections import namedtuple
from datetime import datetime
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
import apns
import bags
import gsa
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
PROTOCOL_VERSION = "1640"
KeyPair = namedtuple("KeyPair", ["key", "cert"])
# global_key, global_cert = load_keys()
def _send_request(
conn: apns.APNSConnection,
bag_key: str,
topic: str,
body: bytes,
keypair: KeyPair,
username: str,
) -> bytes:
body = gzip.compress(body, mtime=0)
push_token = b64encode(conn.token).decode()
# Sign the request
signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
headers = {
"x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replace("\n", ""),
"x-id-nonce": b64encode(nonce).decode(),
"x-id-sig": signature,
"x-push-token": push_token,
"x-id-self-uri": "mailto:" + username,
"User-Agent": USER_AGENT,
"x-protocol-version": "1630",
}
# print(headers)
msg_id = random.randbytes(16)
req = {
"cT": "application/x-apple-plist",
"U": msg_id,
"c": 96,
"ua": USER_AGENT,
"u": bags.ids_bag()[bag_key],
"h": headers,
"v": 2,
"b": body,
}
conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
# resp = conn.wait_for_packet(0x0A)
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
return resp_body["U"] == msg_id
# Lambda to check if the response is the one we want
# conn.incoming_queue.find(check_response)
payload = conn.incoming_queue.wait_pop_find(check_response)
# conn._send_ack(apns._get_field(payload[1], 4))
resp = apns._get_field(payload[1], 3)
return plistlib.loads(resp)
# Performs an IDS lookup
# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic
# self: the user's email address
# keypair: a KeyPair object containing the user's private key and certificate
# topic: the IDS topic to query
# query: a list of URIs to query
def lookup(
conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str]
) -> any:
conn.filter([topic])
query = {"uris": query}
resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self)
# resp = plistlib.loads(resp)
# print(resp)
resp = gzip.decompress(resp["b"])
resp = plistlib.loads(resp)
return resp
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
data = {
"apple-id": username,
"client-id": str(uuid.uuid4()),
"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"password": password,
}
data = plistlib.dumps(data)
r = requests.post(
"https://setup.icloud.com/setup/prefpane/loginDelegates",
auth=(username, password),
data=data,
verify=False,
)
r = plistlib.loads(r.content)
return r
# Gets an IDS auth token for the given username and password
# Will use native Grand Slam on macOS
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
# Returns (realm user id, auth token)
def _get_auth_token(
username: str, password: str, factor_gen: callable = None
) -> tuple[str, str]:
from sys import platform
# if use_gsa:
if platform == "darwin":
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
# Now make the request with the 2FA code
if factor_gen is None:
pet = password + input("Enter 2FA code: ")
else:
pet = password + factor_gen()
r = _auth_token_request(username, pet)
# print(r)
if "description" in r:
raise Exception(f"Error: {r['description']}")
service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
return realm_user_id, auth_token
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
]
)
)
.sign(private_key, hashes.SHA256())
)
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
return (
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
.replace("-----END CERTIFICATE REQUEST-----", "")
.replace("\n", "")
)
# Gets an IDS auth cert for the given user id and auth token
# Returns [private key PEM, certificate PEM]
def _get_auth_cert(user_id, token) -> KeyPair:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
body = {
"authentication-data": {"auth-token": token},
"csr": b64decode(_generate_csr(private_key)),
"realm-user-id": user_id,
}
body = plistlib.dumps(body)
r = requests.post(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
data=body,
headers={"x-protocol-version": "1630"},
verify=False,
)
r = plistlib.loads(r.content)
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip(),
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
)
def _register_request(
push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data
):
body = {
"hardware-version": "MacBookPro18,3",
"language": "en-US",
"os-version": "macOS,13.2.1,22D68",
"software-version": "22D68",
"services": [
{
"capabilities": [{"flags": 1, "name": "Messenger", "version": 1}],
"service": "com.apple.madrid",
"users": [
{
# TODO: Pass ALL URIs from get handles
"uris": [{"uri": info["uri"]}],
"user-id": info["user_id"],
}
],
}
],
"validation-data": b64decode(validation_data),
}
body = plistlib.dumps(body)
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id-0": info["user_id"],
}
_add_auth_push_signatures(
headers, body, "id-register", auth_key, push_key, push_token, 0
)
r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
headers=headers,
data=body,
verify=False,
)
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses
return r
def mini_cert(cert: str):
return (
cert.replace("\n", "")
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
)
PROTOCOL_VERSION = "1640"
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id": user_id,
}
_add_auth_push_signatures(
headers, None, "id-get-handles", auth_key, push_key, push_token
)
r = requests.get(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
headers=headers,
verify=False,
)
r = plistlib.loads(r.content)
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
return [handle["uri"] for handle in r["handles"]]
class IDSUser:
def _authenticate_for_token(
self, username: str, password: str, factor_callback: callable = None
):
self.user_id, self._auth_token = _get_auth_token(
username, password, factor_callback
)
def _authenticate_for_cert(self):
self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token)
# Factor callback will be called if a 2FA code is necessary
def __init__(
self,
push_connection: apns.APNSConnection,
username: str,
password: str,
factor_callback: callable = None,
):
self.push_connection = push_connection
self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert()
self.handles = _get_handles(
b64encode(self.push_connection.token),
self.user_id,
self._auth_keypair,
KeyPair(self.push_connection.private_key, self.push_connection.cert),
)
def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
def test():
import getpass
conn = apns.APNSConnection()
conn.connect()
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
user = IDSUser(conn, username, password)
print(user)
# SIGNING STUFF
# Nonce Format:
# 01000001876bd0a2c0e571093967fce3d7
# 01 # version
# 000001876d008cc5 # unix time
# r1r2r3r4r5r6r7r8 # random bytes
def generate_nonce() -> bytes:
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
# Creates a payload from individual parts for signing
def _create_payload(
bag_key: str,
query_string: str,
push_token: str,
payload: bytes,
nonce: bytes = None,
) -> tuple[str, bytes]:
# Generate the nonce
if nonce is None:
nonce = generate_nonce()
push_token = b64decode(push_token)
if payload is None:
payload = b""
return (
nonce
+ len(bag_key).to_bytes(4, "big")
+ bag_key.encode()
+ len(query_string).to_bytes(4, "big")
+ query_string.encode()
+ len(payload).to_bytes(4, "big")
+ payload
+ len(push_token).to_bytes(4, "big")
+ push_token,
nonce,
)
# Returns signature, nonce
def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
sig = b"\x01\x01" + sig
sig = b64encode(sig).decode()
return sig, nonce
# Add headers for x-push-sig and x-auth-sig stuff
def _add_auth_push_signatures(
headers: dict,
body: bytes,
bag_key: str,
auth_key: KeyPair,
push_key: KeyPair,
push_token: str,
auth_number=None,
):
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
headers["x-push-sig"] = push_sig
headers["x-push-nonce"] = b64encode(push_nonce)
headers["x-push-cert"] = mini_cert(push_key.cert)
headers["x-push-token"] = push_token
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
headers["x-auth-sig" + auth_postfix] = auth_sig
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert)
if __name__ == "__main__":
test()

72
ids/__init__.py Normal file
View file

@ -0,0 +1,72 @@
from base64 import b64encode
import apns
from . import _helpers, identity, profile, query
class IDSUser:
# Sets self.user_id and self._auth_token
def _authenticate_for_token(
self, username: str, password: str, factor_callback: callable = None
):
self.user_id, self._auth_token = profile.get_auth_token(
username, password, factor_callback
)
# Sets self._auth_keypair using self.user_id and self._auth_token
def _authenticate_for_cert(self):
self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token)
# Factor callback will be called if a 2FA code is necessary
def __init__(
self,
push_connection: apns.APNSConnection,
):
self.push_connection = push_connection
self._push_keypair = _helpers.KeyPair(
self.push_connection.private_key, self.push_connection.cert
)
def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
# Authenticates with a username and password, to create a brand new authentication keypair
def authenticate(
self, username: str, password: str, factor_callback: callable = None
):
self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert()
self.handles = profile.get_handles(
b64encode(self.push_connection.token),
self.user_id,
self._auth_keypair,
self._push_keypair,
)
# Uses an existing authentication keypair
def restore_authentication(
self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict
):
self._auth_keypair = auth_keypair
self.user_id = user_id
self.handles = handles
# This is a separate call so that the user can make sure the first part succeeds before asking for validation data
def register(self, validation_data: str):
cert = identity.register(
b64encode(self.push_connection.token),
self.handles,
self.user_id,
self._auth_keypair,
self._push_keypair,
validation_data,
)
self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert)
def restore_identity(self, id_keypair: _helpers.KeyPair):
self._id_keypair = id_keypair
def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any:
return query.lookup(self.push_connection, self.handles[0], self._id_keypair, uris, topic)

16
ids/_helpers.py Normal file
View file

@ -0,0 +1,16 @@
from collections import namedtuple
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
PROTOCOL_VERSION = "1640"
# KeyPair is a named tuple that holds a key and a certificate in PEM form
KeyPair = namedtuple("KeyPair", ["key", "cert"])
def dearmour(armoured: str) -> str:
import re
# Use a regex to remove the header and footer (generic so it work on more than just certificates)
return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace(
"\n", ""
)

63
ids/identity.py Normal file
View file

@ -0,0 +1,63 @@
import plistlib
from base64 import b64decode
import requests
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
from .signing import add_auth_signature, armour_cert
def register(
push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, validation_data
):
uris = [{"uri": handle} for handle in handles]
body = {
"hardware-version": "MacBookPro18,3",
"language": "en-US",
"os-version": "macOS,13.2.1,22D68",
"software-version": "22D68",
"services": [
{
"capabilities": [{"flags": 1, "name": "Messenger", "version": 1}],
"service": "com.apple.madrid",
"users": [
{
"uris": uris,
"user-id": user_id,
}
],
}
],
"validation-data": b64decode(validation_data),
}
body = plistlib.dumps(body)
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id-0": user_id,
}
add_auth_signature(headers, body, "id-register", auth_key, push_key, push_token, 0)
r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
headers=headers,
data=body,
verify=False,
)
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses
if "status" in r and r["status"] != 0:
raise Exception(f"Failed to register: {r}")
if not "services" in r:
raise Exception(f"No services in response: {r}")
if not "users" in r["services"][0]:
raise Exception(f"No users in response: {r}")
if not "cert" in r["services"][0]["users"][0]:
raise Exception(f"No cert in response: {r}")
return armour_cert(r["services"][0]["users"][0]["cert"])

156
ids/profile.py Normal file
View file

@ -0,0 +1,156 @@
import plistlib
import random
import uuid
from base64 import b64decode
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
import gsa
import bags
from . import signing
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
data = {
"apple-id": username,
"client-id": str(uuid.uuid4()),
"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"password": password,
}
data = plistlib.dumps(data)
r = requests.post(
# TODO: Figure out which URL bag we can get this from
"https://setup.icloud.com/setup/prefpane/loginDelegates",
auth=(username, password),
data=data,
verify=False,
)
r = plistlib.loads(r.content)
return r
# Gets an IDS auth token for the given username and password
# Will use native Grand Slam on macOS
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
# Returns (realm user id, auth token)
def get_auth_token(
username: str, password: str, factor_gen: callable = None
) -> tuple[str, str]:
from sys import platform
# if use_gsa:
if platform == "darwin":
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
# Now make the request with the 2FA code
if factor_gen is None:
pet = password + input("Enter 2FA code: ")
else:
pet = password + factor_gen()
r = _auth_token_request(username, pet)
# print(r)
if "description" in r:
raise Exception(f"Error: {r['description']}")
service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
return realm_user_id, auth_token
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
]
)
)
.sign(private_key, hashes.SHA256())
)
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
return (
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
.replace("-----END CERTIFICATE REQUEST-----", "")
.replace("\n", "")
)
# Gets an IDS auth cert for the given user id and auth token
# Returns [private key PEM, certificate PEM]
def get_auth_cert(user_id, token) -> KeyPair:
BAG_KEY = "id-authenticate-ds-id"
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
body = {
"authentication-data": {"auth-token": token},
"csr": b64decode(_generate_csr(private_key)),
"realm-user-id": user_id,
}
body = plistlib.dumps(body)
r = requests.post(
bags.ids_bag()[BAG_KEY],
#"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
data=body,
headers={"x-protocol-version": "1630"},
verify=False,
)
r = plistlib.loads(r.content)
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip(),
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
)
def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
BAG_KEY = "id-get-handles"
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id": user_id,
}
signing.add_auth_signature(
headers, None, BAG_KEY, auth_key, push_key, push_token
)
r = requests.get(
bags.ids_bag()[BAG_KEY],
headers=headers,
verify=False,
)
r = plistlib.loads(r.content)
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
return [handle["uri"] for handle in r["handles"]]

69
ids/query.py Normal file
View file

@ -0,0 +1,69 @@
import gzip
import plistlib
import random
from base64 import b64encode
import apns
import bags
from ._helpers import KeyPair, PROTOCOL_VERSION
from . import signing
def lookup(
conn: apns.APNSConnection,
self_uri: str,
id_keypair: KeyPair,
query: list[str],
topic,
) -> bytes:
BAG_KEY = "id-query"
conn.filter([topic])
body = plistlib.dumps({"uris": query})
body = gzip.compress(body, mtime=0)
push_token = b64encode(conn.token).decode()
headers = {
"x-id-self-uri": self_uri,
"x-protocol-version": PROTOCOL_VERSION,
}
signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token)
msg_id = random.randbytes(16)
req = {
"cT": "application/x-apple-plist",
"U": msg_id,
"c": 96,
"u": bags.ids_bag()[BAG_KEY],
"h": headers,
"v": 2,
"b": body,
}
conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
return resp_body["U"] == msg_id
# Lambda to check if the response is the one we want
payload = conn.incoming_queue.wait_pop_find(check_response)
resp = apns._get_field(payload[1], 3)
resp = plistlib.loads(resp)
resp = gzip.decompress(resp["b"])
resp = plistlib.loads(resp)
if resp['status'] != 0:
raise Exception(f'Query failed: {resp}')
if not 'results' in resp:
raise Exception(f'No results in response: {resp}')
return resp['results']

125
ids/signing.py Normal file
View file

@ -0,0 +1,125 @@
import random
from base64 import b64decode, b64encode
from datetime import datetime
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
from ._helpers import KeyPair, dearmour
# TODO: Move this helper somewhere else
def armour_cert(cert: bytes) -> str:
cert = x509.load_der_x509_certificate(cert)
return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
"""
Generates a nonce in this format:
01000001876bd0a2c0e571093967fce3d7
01 # version
000001876d008cc5 # unix time
r1r2r3r4r5r6r7r8 # random bytes
"""
def generate_nonce() -> bytes:
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
import typing
# Creates a payload from individual parts for signing
def _create_payload(
bag_key: str,
query_string: str,
push_token: typing.Union[str, bytes],
payload: bytes,
nonce: typing.Union[bytes, None] = None,
) -> tuple[bytes, bytes]:
# Generate the nonce
if nonce is None:
nonce = generate_nonce()
push_token = b64decode(push_token)
if payload is None:
payload = b""
return (
nonce
+ len(bag_key).to_bytes(4, "big")
+ bag_key.encode()
+ len(query_string).to_bytes(4, "big")
+ query_string.encode()
+ len(payload).to_bytes(4, "big")
+ payload
+ len(push_token).to_bytes(4, "big")
+ push_token,
nonce,
)
# Returns signature, nonce
def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes, nonce = None
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload, nonce)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore
sig = b"\x01\x01" + sig
sig = b64encode(sig).decode()
return sig, nonce
# Add headers for x-push-sig and x-auth-sig stuff
def add_auth_signature(
headers: dict,
body: bytes,
bag_key: str,
auth_key: KeyPair,
push_key: KeyPair,
push_token: str,
auth_number=None,
):
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
headers["x-push-sig"] = push_sig
headers["x-push-nonce"] = b64encode(push_nonce)
headers["x-push-cert"] = dearmour(push_key.cert)
headers["x-push-token"] = push_token
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
headers["x-auth-sig" + auth_postfix] = auth_sig
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert)
def add_id_signature(
headers: dict,
body: bytes,
bag_key: str,
id_key: KeyPair,
push_token: str,
nonce=None,
):
id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body, nonce)
headers["x-id-sig"] = id_sig
headers["x-id-nonce"] = b64encode(id_nonce).decode()
headers["x-id-cert"] = dearmour(id_key.cert)
headers["x-push-token"] = push_token