remove old stuff, more refactoring

This commit is contained in:
JJTech0130 2023-05-09 17:03:27 -04:00
parent f06b4772f1
commit c99826d60e
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
9 changed files with 117 additions and 703 deletions

208
demo.py
View file

@ -1,18 +1,9 @@
import getpass
import json import json
from base64 import b64encode
from getpass import getpass
import ids
from base64 import b64encode, b64decode
import apns import apns
from ids import signing import ids
# Open config
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
def input_multiline(prompt): def input_multiline(prompt):
@ -26,167 +17,56 @@ def input_multiline(prompt):
return "\n".join(lines) return "\n".join(lines)
def refresh_token(): # Try and load config.json
# If no username is set, prompt for it try:
if "username" not in CONFIG: with open("config.json", "r") as f:
CONFIG["username"] = input("Enter iCloud username: ")
# If no password is set, prompt for it
if "password" not in CONFIG:
CONFIG["password"] = getpass.getpass("Enter iCloud password: ")
# If grandslam authentication is not set, prompt for it
if "use_gsa" not in CONFIG:
CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y"
def factor_gen(): CONFIG = json.load(f)
return input("Enter iCloud 2FA code: ") except FileNotFoundError:
CONFIG = {}
CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token(
CONFIG["username"], CONFIG["password"], factor_gen=factor_gen
)
def refresh_cert(): conn = apns.APNSConnection(
CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert( CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
CONFIG["user_id"], CONFIG["token"] )
) conn.connect(CONFIG.get("push", {}).get("token"))
user = ids.IDSUser(conn)
def create_connection(): if CONFIG.get("auth", {}).get("cert") is not None:
conn = apns.APNSConnection() auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
token = conn.connect() user_id = CONFIG["auth"]["user_id"]
# conn.filter(['com.apple.madrid']) handles = CONFIG["auth"]["handles"]
CONFIG["push"] = { user.restore_authentication(auth_keypair, user_id, handles)
"token": b64encode(token).decode(),
"cert": conn.cert,
"key": conn.private_key,
}
return conn
def restore_connection():
conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"])
conn.connect(True, b64decode(CONFIG["push"]["token"]))
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi'])
return conn
def refresh_ids_cert():
info = {
"uri": "mailto:" + CONFIG["username"],
"user_id": CONFIG["user_id"],
}
print(
ids._get_handles(
CONFIG["push"]["token"],
CONFIG["user_id"],
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
)
)
resp = None
try:
if "validation_data" in CONFIG:
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
CONFIG["validation_data"],
)
except Exception as e:
print(e)
resp = None
if resp is None:
print(
"Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy."
)
validation_data = (
input_multiline("Enter validation data: ")
.replace("\n", "")
.replace(" ", "")
)
resp = ids._register_request(
CONFIG["push"]["token"],
info,
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
validation_data,
)
CONFIG["validation_data"] = validation_data
print(resp)
ids_cert = signing.armour_cert(resp["services"][0]["users"][0]["cert"])
CONFIG["ids_cert"] = ids_cert
if not "push" in CONFIG:
print("No existing APNs credentials, creating new ones...")
# print("No push conn")
conn = create_connection()
else: else:
print("Restoring APNs credentials...") username = input("Username: ")
conn = restore_connection() password = getpass("Password: ")
print("Connected to APNs!")
if not "ids_cert" in CONFIG: user.authenticate(username, password)
print("No existing IDS certificate, creating new one...")
if not "key" in CONFIG:
print("No existing authentication certificate, creating new one...")
if not "token" in CONFIG:
print("No existing authentication token, creating new one...")
refresh_token()
print("Got authentication token!")
refresh_cert()
print("Got authentication certificate!")
refresh_ids_cert()
print("Got IDS certificate!")
ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"]) if CONFIG.get("id", {}).get("cert") is not None:
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair)
else:
vd = input_multiline("Enter validation data: ")
user.register(vd)
# Write config.json
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.token).decode(),
"key": user.push_connection.private_key,
"cert": user.push_connection.cert,
}
def lookup(topic: str, users: list[str]):
print(f"Looking up users {users} for topic {topic}...")
resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users)
# print(resp)
# r = list(resp['results'].values())[0]
for k, v in resp["results"].items():
print(f"Result for user {k} topic {topic}:")
i = v["identities"]
print(f"IDENTITIES: {len(i)}")
for iden in i:
print("IDENTITY", end=" ")
print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ")
if "client-data" in iden:
print(f"Client Data: {len(iden['client-data'])}")
else:
print("No client data")
# Hack to make sure that the requests and responses match up
# This filter MUST contain all the topics you are looking up
# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing'])
# import time
# print("...waiting for queued messages... (this is a hack)")
# time.sleep(5) # Let the server send us any messages it was holding
# conn.sink() # Dump the messages
lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"])
lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"])
lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"])
lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"])
# time.sleep(4)
# Save config
with open("config.json", "w") as f: with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4) json.dump(CONFIG, f, indent=4)

View file

@ -1,6 +1,9 @@
from base64 import b64encode from base64 import b64encode
import apns import apns
from . import profile, _helpers, identity
from . import _helpers, identity, profile
class IDSUser: class IDSUser:
# Sets self.user_id and self._auth_token # Sets self.user_id and self._auth_token
@ -21,13 +24,17 @@ class IDSUser:
push_connection: apns.APNSConnection, push_connection: apns.APNSConnection,
): ):
self.push_connection = push_connection self.push_connection = push_connection
self._push_keypair = _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert) self._push_keypair = _helpers.KeyPair(
self.push_connection.private_key, self.push_connection.cert
)
def __str__(self): def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
# Authenticates with a username and password, to create a brand new authentication keypair # Authenticates with a username and password, to create a brand new authentication keypair
def authenticate(self, username: str, password: str, factor_callback: callable = None): def authenticate(
self, username: str, password: str, factor_callback: callable = None
):
self._authenticate_for_token(username, password, factor_callback) self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert() self._authenticate_for_cert()
self.handles = profile._get_handles( self.handles = profile._get_handles(
@ -38,20 +45,24 @@ class IDSUser:
) )
# Uses an existing authentication keypair # Uses an existing authentication keypair
def restore_authentication(self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict): def restore_authentication(
self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict
):
self._auth_keypair = auth_keypair self._auth_keypair = auth_keypair
self.user_id = user_id self.user_id = user_id
self.handles = handles self.handles = handles
# This is a separate call so that the user can make sure the first part succeeds before asking for validation data # This is a separate call so that the user can make sure the first part succeeds before asking for validation data
def register(self, validation_data: str): def register(self, validation_data: str):
resp = identity.register_request( cert = identity.register(
b64encode(self.push_connection.token), b64encode(self.push_connection.token),
self.handles, self.handles,
self.user_id, self.user_id,
self._auth_keypair, self._auth_keypair,
self._push_keypair, self._push_keypair,
validation_data validation_data,
) )
print(resp) self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert)
def restore_identity(self, id_keypair: _helpers.KeyPair):
self._id_keypair = id_keypair

View file

@ -6,7 +6,11 @@ PROTOCOL_VERSION = "1640"
# KeyPair is a named tuple that holds a key and a certificate in PEM form # KeyPair is a named tuple that holds a key and a certificate in PEM form
KeyPair = namedtuple("KeyPair", ["key", "cert"]) KeyPair = namedtuple("KeyPair", ["key", "cert"])
def dearmour(armoured: str) -> str: def dearmour(armoured: str) -> str:
import re import re
# Use a regex to remove the header and footer (generic so it work on more than just certificates) # Use a regex to remove the header and footer (generic so it work on more than just certificates)
return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace("\n", "") return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace(
"\n", ""
)

View file

@ -1,11 +1,14 @@
from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT
from base64 import b64decode
import plistlib import plistlib
import requests from base64 import b64decode
from .signing import add_auth_signature
def register_request( import requests
push_token, handles, uid, auth_key: KeyPair, push_key: KeyPair, validation_data
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
from .signing import add_auth_signature, armour_cert
def register(
push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, validation_data
): ):
uris = [{"uri": handle} for handle in handles] uris = [{"uri": handle} for handle in handles]
@ -20,9 +23,8 @@ def register_request(
"service": "com.apple.madrid", "service": "com.apple.madrid",
"users": [ "users": [
{ {
# TODO: Pass ALL URIs from get handles
"uris": uris, "uris": uris,
"user-id": uid, "user-id": user_id,
} }
], ],
} }
@ -34,11 +36,9 @@ def register_request(
headers = { headers = {
"x-protocol-version": PROTOCOL_VERSION, "x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id-0": uid, "x-auth-user-id-0": user_id,
} }
add_auth_signature( add_auth_signature(headers, body, "id-register", auth_key, push_key, push_token, 0)
headers, body, "id-register", auth_key, push_key, push_token, 0
)
r = requests.post( r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
@ -51,4 +51,13 @@ def register_request(
if "status" in r and r["status"] == 6004: if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!") raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses # TODO: Do validation of nested statuses
return r if "status" in r and r["status"] != 0:
raise Exception(f"Failed to register: {r}")
if not "services" in r:
raise Exception(f"No services in response: {r}")
if not "users" in r["services"][0]:
raise Exception(f"No users in response: {r}")
if not "cert" in r["services"][0]["users"][0]:
raise Exception(f"No cert in response: {r}")
return armour_cert(r["services"][0]["users"][0]["cert"])

View file

@ -1,16 +1,19 @@
import requests
import plistlib import plistlib
import random
import uuid import uuid
import gsa from base64 import b64decode
import requests
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
import random
from base64 import b64decode import gsa
from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT
from . import signing from . import signing
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
def _auth_token_request(username: str, password: str) -> any: def _auth_token_request(username: str, password: str) -> any:
@ -122,24 +125,16 @@ def _get_auth_cert(user_id, token) -> KeyPair:
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
) )
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
headers = { headers = {
"x-protocol-version": PROTOCOL_VERSION, "x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id": user_id, "x-auth-user-id": user_id,
#"user-agent": USER_AGENT,
} }
import oldids
#headers2 = headers.copy()
#oldids._add_auth_push_sig(headers2, None, "id-get-handles", auth_key, push_key, push_token)
#headers3 = headers.copy()
signing.add_auth_signature( signing.add_auth_signature(
headers, None, "id-get-handles", auth_key, push_key, push_token headers, None, "id-get-handles", auth_key, push_key, push_token
) )
#for key, value in headers2.items():
# if headers3[key] != value:
# print(f"Key {key} mismatch: {headers3[key]} != {value}")
r = requests.get( r = requests.get(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
headers=headers, headers=headers,

View file

@ -5,6 +5,7 @@ from base64 import b64encode
import apns import apns
import bags import bags
from . import USER_AGENT, KeyPair, signing from . import USER_AGENT, KeyPair, signing

View file

@ -1,18 +1,21 @@
from ._helpers import dearmour, KeyPair
from datetime import datetime
import random import random
from base64 import b64encode, b64decode from base64 import b64decode, b64encode
from datetime import datetime
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
from ._helpers import KeyPair, dearmour
# TODO: Move this helper somewhere else # TODO: Move this helper somewhere else
def armour_cert(cert: bytes) -> str: def armour_cert(cert: bytes) -> str:
x509.load_der_x509_certificate(cert) cert = x509.load_der_x509_certificate(cert)
ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
return ids_cert
""" """
Generates a nonce in this format: Generates a nonce in this format:
@ -21,6 +24,8 @@ Generates a nonce in this format:
000001876d008cc5 # unix time 000001876d008cc5 # unix time
r1r2r3r4r5r6r7r8 # random bytes r1r2r3r4r5r6r7r8 # random bytes
""" """
def generate_nonce() -> bytes: def generate_nonce() -> bytes:
return ( return (
b"\x01" b"\x01"
@ -31,6 +36,7 @@ def generate_nonce() -> bytes:
import typing import typing
# Creates a payload from individual parts for signing # Creates a payload from individual parts for signing
def _create_payload( def _create_payload(
bag_key: str, bag_key: str,
@ -80,6 +86,7 @@ def _sign_payload(
return sig, nonce return sig, nonce
# Add headers for x-push-sig and x-auth-sig stuff # Add headers for x-push-sig and x-auth-sig stuff
def add_auth_signature( def add_auth_signature(
headers: dict, headers: dict,

View file

@ -1,59 +0,0 @@
import ids
import apns
from getpass import getpass
import json
from base64 import b64encode
def input_multiline(prompt):
print(prompt)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines)
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
conn = apns.APNSConnection(CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert"))
conn.connect(CONFIG.get("push", {}).get("token"))
user = ids.IDSUser(conn)
if CONFIG.get("auth", {}).get("cert") is not None:
auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
user_id = CONFIG["auth"]["user_id"]
handles = CONFIG["auth"]["handles"]
user.restore_authentication(auth_keypair, user_id, handles)
else:
username = input("Username: ")
password = getpass("Password: ")
user.authenticate(username, password)
vd = input_multiline("Enter validation data: ")
user.register(vd)
# Write config.json
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.token).decode(),
"key": user.push_connection.private_key,
"cert": user.push_connection.cert,
}
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)

434
oldids.py
View file

@ -1,434 +0,0 @@
import gzip
import plistlib
import random
import uuid
from base64 import b64decode, b64encode
from collections import namedtuple
from datetime import datetime
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
import apns
import bags
import gsa
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
PROTOCOL_VERSION = "1640"
KeyPair = namedtuple("KeyPair", ["key", "cert"])
# global_key, global_cert = load_keys()
def _send_request(
conn: apns.APNSConnection,
bag_key: str,
topic: str,
body: bytes,
keypair: KeyPair,
username: str,
) -> bytes:
body = gzip.compress(body, mtime=0)
push_token = b64encode(conn.token).decode()
# Sign the request
signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
headers = {
"x-id-self-uri": "mailto:" + username,
"User-Agent": USER_AGENT,
"x-protocol-version": "1630",
}
_add_id_sig(headers, body, bag_key, keypair, push_token)
# print(headers)
msg_id = random.randbytes(16)
req = {
"cT": "application/x-apple-plist",
"U": msg_id,
"c": 96,
"ua": USER_AGENT,
"u": bags.ids_bag()[bag_key],
"h": headers,
"v": 2,
"b": body,
}
conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
# resp = conn.wait_for_packet(0x0A)
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
return resp_body["U"] == msg_id
# Lambda to check if the response is the one we want
# conn.incoming_queue.find(check_response)
payload = conn.incoming_queue.wait_pop_find(check_response)
# conn._send_ack(apns._get_field(payload[1], 4))
resp = apns._get_field(payload[1], 3)
return plistlib.loads(resp)
# Performs an IDS lookup
# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic
# self: the user's email address
# keypair: a KeyPair object containing the user's private key and certificate
# topic: the IDS topic to query
# query: a list of URIs to query
def lookup(
conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str]
) -> any:
conn.filter([topic])
query = {"uris": query}
resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self)
# resp = plistlib.loads(resp)
# print(resp)
resp = gzip.decompress(resp["b"])
resp = plistlib.loads(resp)
return resp
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
data = {
"apple-id": username,
"client-id": str(uuid.uuid4()),
"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"password": password,
}
data = plistlib.dumps(data)
r = requests.post(
"https://setup.icloud.com/setup/prefpane/loginDelegates",
auth=(username, password),
data=data,
verify=False,
)
r = plistlib.loads(r.content)
return r
# Gets an IDS auth token for the given username and password
# Will use native Grand Slam on macOS
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
# Returns (realm user id, auth token)
def _get_auth_token(
username: str, password: str, factor_gen: callable = None
) -> tuple[str, str]:
from sys import platform
# if use_gsa:
if platform == "darwin":
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
# Now make the request with the 2FA code
if factor_gen is None:
pet = password + input("Enter 2FA code: ")
else:
pet = password + factor_gen()
r = _auth_token_request(username, pet)
# print(r)
if "description" in r:
raise Exception(f"Error: {r['description']}")
service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
return realm_user_id, auth_token
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
]
)
)
.sign(private_key, hashes.SHA256())
)
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
return (
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
.replace("-----END CERTIFICATE REQUEST-----", "")
.replace("\n", "")
)
# Gets an IDS auth cert for the given user id and auth token
# Returns [private key PEM, certificate PEM]
def _get_auth_cert(user_id, token) -> KeyPair:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
body = {
"authentication-data": {"auth-token": token},
"csr": b64decode(_generate_csr(private_key)),
"realm-user-id": user_id,
}
body = plistlib.dumps(body)
r = requests.post(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
data=body,
headers={"x-protocol-version": "1630"},
verify=False,
)
r = plistlib.loads(r.content)
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip(),
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
)
def _register_request(
push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data
):
body = {
"hardware-version": "MacBookPro18,3",
"language": "en-US",
"os-version": "macOS,13.2.1,22D68",
"software-version": "22D68",
"services": [
{
"capabilities": [{"flags": 1, "name": "Messenger", "version": 1}],
"service": "com.apple.madrid",
"users": [
{
# TODO: Pass ALL URIs from get handles
"uris": [{"uri": info["uri"]}],
"user-id": info["user_id"],
}
],
}
],
"validation-data": b64decode(validation_data),
}
body = plistlib.dumps(body)
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id-0": info["user_id"],
}
_add_auth_push_sig(
headers, body, "id-register", auth_key, push_key, push_token, 0
)
r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
headers=headers,
data=body,
verify=False,
)
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses
return r
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
headers = {
"x-protocol-version": PROTOCOL_VERSION,
"x-auth-user-id": user_id,
}
_add_auth_push_sig(
headers, None, "id-get-handles", auth_key, push_key, push_token
)
r = requests.get(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
headers=headers,
verify=False,
)
r = plistlib.loads(r.content)
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
return [handle["uri"] for handle in r["handles"]]
class IDSUser:
def _authenticate_for_token(
self, username: str, password: str, factor_callback: callable = None
):
self.user_id, self._auth_token = _get_auth_token(
username, password, factor_callback
)
def _authenticate_for_cert(self):
self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token)
# Factor callback will be called if a 2FA code is necessary
def __init__(
self,
push_connection: apns.APNSConnection,
username: str,
password: str,
factor_callback: callable = None,
):
self.push_connection = push_connection
self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert()
self.handles = _get_handles(
b64encode(self.push_connection.token),
self.user_id,
self._auth_keypair,
KeyPair(self.push_connection.private_key, self.push_connection.cert),
)
def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
def test():
import getpass
conn = apns.APNSConnection()
conn.connect()
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
user = IDSUser(conn, username, password)
print(user)
# SIGNING STUFF
# Nonce Format:
# 01000001876bd0a2c0e571093967fce3d7
# 01 # version
# 000001876d008cc5 # unix time
# r1r2r3r4r5r6r7r8 # random bytes
def generate_nonce() -> bytes:
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
# Creates a payload from individual parts for signing
def _create_payload(
bag_key: str,
query_string: str,
push_token: str,
payload: bytes,
nonce: bytes = None,
) -> tuple[str, bytes]:
# Generate the nonce
if nonce is None:
nonce = generate_nonce()
push_token = b64decode(push_token)
if payload is None:
payload = b""
return (
nonce
+ len(bag_key).to_bytes(4, "big")
+ bag_key.encode()
+ len(query_string).to_bytes(4, "big")
+ query_string.encode()
+ len(payload).to_bytes(4, "big")
+ payload
+ len(push_token).to_bytes(4, "big")
+ push_token,
nonce,
)
# Returns signature, nonce
def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
sig = b"\x01\x01" + sig
sig = b64encode(sig).decode()
return sig, nonce
def dearmour(cert: str):
return (
cert.replace("\n", "")
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
)
# Add headers for x-push-sig and x-auth-sig stuff
def _add_auth_push_sig(
headers: dict,
body: bytes,
bag_key: str,
auth_key: KeyPair,
push_key: KeyPair,
push_token: str,
auth_number=None,
):
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
headers["x-push-sig"] = push_sig
headers["x-push-nonce"] = b64encode(push_nonce)
headers["x-push-cert"] = dearmour(push_key.cert)
headers["x-push-token"] = push_token
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
headers["x-auth-sig" + auth_postfix] = auth_sig
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert)
def _add_id_sig(
headers: dict,
body: bytes,
bag_key: str,
id_key: KeyPair,
push_token: str,
):
id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body)
headers["x-id-sig"] = id_sig
headers["x-id-nonce"] = b64encode(id_nonce)
headers["x-id-cert"] = dearmour(id_key.cert)
headers["x-push-token"] = push_token
if __name__ == "__main__":
test()