From 2e6999056aa9978e5cae44eb59246160e8870118 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 15:36:33 -0400 Subject: [PATCH] starting the refactor --- albert.py | 4 +- demo.py | 8 ++- development/demo.py | 2 +- gsa.py | 5 +- ids/__init__.py | 37 +++++++++++ ids/_helpers.py | 12 ++++ ids/identity.py | 0 ids/profile.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ ids/query.py | 84 ++++++++++++++++++++++++ ids/signing.py | 117 +++++++++++++++++++++++++++++++++ newdemo.py | 13 ++++ ids.py => oldids.py | 48 +++++++------- 12 files changed, 453 insertions(+), 31 deletions(-) create mode 100644 ids/__init__.py create mode 100644 ids/_helpers.py create mode 100644 ids/identity.py create mode 100644 ids/profile.py create mode 100644 ids/query.py create mode 100644 ids/signing.py create mode 100644 newdemo.py rename ids.py => oldids.py (95%) diff --git a/albert.py b/albert.py index 5a3d330..17b7fd0 100644 --- a/albert.py +++ b/albert.py @@ -68,7 +68,7 @@ def generate_push_cert() -> tuple[str, str]: ) # Sign the activation info - signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1()) + signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1()) # type: ignore body = { "ActivationInfoComplete": True, @@ -83,7 +83,7 @@ def generate_push_cert() -> tuple[str, str]: verify=False, ) - protocol = re.search("(.*)", resp.text).group(1) + protocol = re.search("(.*)", resp.text).group(1) # type: ignore protocol = plistlib.loads(protocol.encode("utf-8")) return ( diff --git a/demo.py b/demo.py index 73ebb9e..5e4d04c 100644 --- a/demo.py +++ b/demo.py @@ -2,7 +2,10 @@ import getpass import json import ids -from ids import * +from base64 import b64encode, b64decode +import apns +from ids import signing + # Open config try: @@ -115,8 +118,7 @@ def refresh_ids_cert(): CONFIG["validation_data"] = validation_data print(resp) - ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"]) - ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + ids_cert = signing.armour_cert(resp["services"][0]["users"][0]["cert"]) CONFIG["ids_cert"] = ids_cert diff --git a/development/demo.py b/development/demo.py index 70ff35b..730ae9e 100644 --- a/development/demo.py +++ b/development/demo.py @@ -14,4 +14,4 @@ conn1.filter(["com.apple.madrid"]) # print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"])) -print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1")) +#print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1")) diff --git a/gsa.py b/gsa.py index c8dbfe4..e2dc8da 100644 --- a/gsa.py +++ b/gsa.py @@ -326,8 +326,9 @@ def check_error(r): status = r if status["ec"] != 0: - print(f"Error {status['ec']}: {status['em']}") - return True + raise Exception(f"Error {status['ec']}: {status['em']}") + #print(f"Error {status['ec']}: {status['em']}") + #return True return False diff --git a/ids/__init__.py b/ids/__init__.py new file mode 100644 index 0000000..4c95124 --- /dev/null +++ b/ids/__init__.py @@ -0,0 +1,37 @@ +from base64 import b64encode +import apns +from . import profile +from . import _helpers +#from .profile import _get_auth_cert, _get_auth_token, _get_handles + +class IDSUser: + def _authenticate_for_token( + self, username: str, password: str, factor_callback: callable = None + ): + self.user_id, self._auth_token = profile._get_auth_token( + username, password, factor_callback + ) + + def _authenticate_for_cert(self): + self._auth_keypair = profile._get_auth_cert(self.user_id, self._auth_token) + + # Factor callback will be called if a 2FA code is necessary + def __init__( + self, + push_connection: apns.APNSConnection, + username: str, + password: str, + factor_callback: callable = None, + ): + self.push_connection = push_connection + self._authenticate_for_token(username, password, factor_callback) + self._authenticate_for_cert() + self.handles = profile._get_handles( + b64encode(self.push_connection.token), + self.user_id, + self._auth_keypair, + _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert), + ) + + def __str__(self): + return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" \ No newline at end of file diff --git a/ids/_helpers.py b/ids/_helpers.py new file mode 100644 index 0000000..9f2bc93 --- /dev/null +++ b/ids/_helpers.py @@ -0,0 +1,12 @@ +from collections import namedtuple + +USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" +PROTOCOL_VERSION = "1640" + +# KeyPair is a named tuple that holds a key and a certificate in PEM form +KeyPair = namedtuple("KeyPair", ["key", "cert"]) + +def dearmour(armoured: str) -> str: + import re + # Use a regex to remove the header and footer (generic so it work on more than just certificates) + return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace("\n", "") \ No newline at end of file diff --git a/ids/identity.py b/ids/identity.py new file mode 100644 index 0000000..e69de29 diff --git a/ids/profile.py b/ids/profile.py new file mode 100644 index 0000000..c503ffc --- /dev/null +++ b/ids/profile.py @@ -0,0 +1,154 @@ +import requests +import plistlib +import uuid +import gsa +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID +import random +from base64 import b64decode +from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT +from . import signing + + +def _auth_token_request(username: str, password: str) -> any: + # Turn the PET into an auth token + data = { + "apple-id": username, + "client-id": str(uuid.uuid4()), + "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, + "password": password, + } + data = plistlib.dumps(data) + + r = requests.post( + "https://setup.icloud.com/setup/prefpane/loginDelegates", + auth=(username, password), + data=data, + verify=False, + ) + r = plistlib.loads(r.content) + return r + + +# Gets an IDS auth token for the given username and password +# Will use native Grand Slam on macOS +# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted +# Returns (realm user id, auth token) +def _get_auth_token( + username: str, password: str, factor_gen: callable = None +) -> tuple[str, str]: + from sys import platform + + # if use_gsa: + if platform == "darwin": + g = gsa.authenticate(username, password, gsa.Anisette()) + pet = g["t"]["com.apple.gs.idms.pet"]["token"] + else: + # Make the request without the 2FA code to make the prompt appear + _auth_token_request(username, password) + # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA + # Now make the request with the 2FA code + if factor_gen is None: + pet = password + input("Enter 2FA code: ") + else: + pet = password + factor_gen() + r = _auth_token_request(username, pet) + # print(r) + if "description" in r: + raise Exception(f"Error: {r['description']}") + service_data = r["delegates"]["com.apple.private.ids"]["service-data"] + realm_user_id = service_data["realm-user-id"] + auth_token = service_data["auth-token"] + # print(f"Auth token for {realm_user_id}: {auth_token}") + return realm_user_id, auth_token + + +def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), + ] + ) + ) + .sign(private_key, hashes.SHA256()) + ) + + csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + return ( + csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") + .replace("-----END CERTIFICATE REQUEST-----", "") + .replace("\n", "") + ) + + +# Gets an IDS auth cert for the given user id and auth token +# Returns [private key PEM, certificate PEM] +def _get_auth_cert(user_id, token) -> KeyPair: + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + body = { + "authentication-data": {"auth-token": token}, + "csr": b64decode(_generate_csr(private_key)), + "realm-user-id": user_id, + } + + body = plistlib.dumps(body) + + r = requests.post( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", + data=body, + headers={"x-protocol-version": "1630"}, + verify=False, + ) + r = plistlib.loads(r.content) + if r["status"] != 0: + raise (Exception(f"Failed to get auth cert: {r}")) + cert = x509.load_der_x509_certificate(r["cert"]) + return KeyPair( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + .decode("utf-8") + .strip(), + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), + ) + +def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id": user_id, + #"user-agent": USER_AGENT, + } + import oldids + #headers2 = headers.copy() + #oldids._add_auth_push_sig(headers2, None, "id-get-handles", auth_key, push_key, push_token) + #headers3 = headers.copy() + signing.add_auth_signature( + headers, None, "id-get-handles", auth_key, push_key, push_token + ) + + #for key, value in headers2.items(): + # if headers3[key] != value: + # print(f"Key {key} mismatch: {headers3[key]} != {value}") + + r = requests.get( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", + headers=headers, + verify=False, + ) + + r = plistlib.loads(r.content) + + if not "handles" in r: + raise Exception("No handles in response: " + str(r)) + + return [handle["uri"] for handle in r["handles"]] \ No newline at end of file diff --git a/ids/query.py b/ids/query.py new file mode 100644 index 0000000..443d665 --- /dev/null +++ b/ids/query.py @@ -0,0 +1,84 @@ +import gzip +import plistlib +import random +from base64 import b64encode + +import apns +import bags +from . import USER_AGENT, KeyPair, signing + + +def _send_request( + conn: apns.APNSConnection, + bag_key: str, + topic: str, + body: bytes, + keypair: KeyPair, + username: str, +) -> bytes: + body = gzip.compress(body, mtime=0) + + push_token = b64encode(conn.token).decode() + + # Sign the request + # signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) + + headers = { + "x-id-self-uri": "mailto:" + username, + "User-Agent": USER_AGENT, + "x-protocol-version": "1630", + } + signing.add_id_signature(headers, body, bag_key, keypair, push_token) + + # print(headers) + + msg_id = random.randbytes(16) + + req = { + "cT": "application/x-apple-plist", + "U": msg_id, + "c": 96, + "ua": USER_AGENT, + "u": bags.ids_bag()[bag_key], + "h": headers, + "v": 2, + "b": body, + } + + conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) + # resp = conn.wait_for_packet(0x0A) + + def check_response(x): + if x[0] != 0x0A: + return False + resp_body = apns._get_field(x[1], 3) + if resp_body is None: + return False + resp_body = plistlib.loads(resp_body) + return resp_body["U"] == msg_id + + # Lambda to check if the response is the one we want + # conn.incoming_queue.find(check_response) + payload = conn.incoming_queue.wait_pop_find(check_response) + # conn._send_ack(apns._get_field(payload[1], 4)) + resp = apns._get_field(payload[1], 3) + return plistlib.loads(resp) + + +# Performs an IDS lookup +# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic +# self: the user's email address +# keypair: a KeyPair object containing the user's private key and certificate +# topic: the IDS topic to query +# query: a list of URIs to query +def lookup( + conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] +) -> any: + conn.filter([topic]) + query = {"uris": query} + resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) + # resp = plistlib.loads(resp) + # print(resp) + resp = gzip.decompress(resp["b"]) + resp = plistlib.loads(resp) + return resp diff --git a/ids/signing.py b/ids/signing.py new file mode 100644 index 0000000..7f09170 --- /dev/null +++ b/ids/signing.py @@ -0,0 +1,117 @@ +from ._helpers import dearmour, KeyPair +from datetime import datetime +import random +from base64 import b64encode, b64decode +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID + +# TODO: Move this helper somewhere else +def armour_cert(cert: bytes) -> str: + x509.load_der_x509_certificate(cert) + ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + return ids_cert + +""" +Generates a nonce in this format: +01000001876bd0a2c0e571093967fce3d7 +01 # version + 000001876d008cc5 # unix time + r1r2r3r4r5r6r7r8 # random bytes +""" +def generate_nonce() -> bytes: + return ( + b"\x01" + + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + + random.randbytes(8) + ) + + +import typing + +# Creates a payload from individual parts for signing +def _create_payload( + bag_key: str, + query_string: str, + push_token: typing.Union[str, bytes], + payload: bytes, + nonce: typing.Union[bytes, None] = None, +) -> tuple[bytes, bytes]: + # Generate the nonce + if nonce is None: + nonce = generate_nonce() + + push_token = b64decode(push_token) + + if payload is None: + payload = b"" + + return ( + nonce + + len(bag_key).to_bytes(4, "big") + + bag_key.encode() + + len(query_string).to_bytes(4, "big") + + query_string.encode() + + len(payload).to_bytes(4, "big") + + payload + + len(push_token).to_bytes(4, "big") + + push_token, + nonce, + ) + + +# Returns signature, nonce +def _sign_payload( + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes +) -> tuple[str, bytes]: + # Load the private key + key = serialization.load_pem_private_key( + private_key.encode(), password=None, backend=default_backend() + ) + + payload, nonce = _create_payload(bag_key, query_string, push_token, payload) + + sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore + + sig = b"\x01\x01" + sig + sig = b64encode(sig).decode() + + return sig, nonce + +# Add headers for x-push-sig and x-auth-sig stuff +def add_auth_signature( + headers: dict, + body: bytes, + bag_key: str, + auth_key: KeyPair, + push_key: KeyPair, + push_token: str, + auth_number=None, +): + push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) + headers["x-push-sig"] = push_sig + headers["x-push-nonce"] = b64encode(push_nonce) + headers["x-push-cert"] = dearmour(push_key.cert) + headers["x-push-token"] = push_token + + auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) + auth_postfix = "-" + str(auth_number) if auth_number is not None else "" + headers["x-auth-sig" + auth_postfix] = auth_sig + headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) + headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) + + +def add_id_signature( + headers: dict, + body: bytes, + bag_key: str, + id_key: KeyPair, + push_token: str, +): + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) + headers["x-id-sig"] = id_sig + headers["x-id-nonce"] = b64encode(id_nonce) + headers["x-id-cert"] = dearmour(id_key.cert) + headers["x-push-token"] = push_token diff --git a/newdemo.py b/newdemo.py new file mode 100644 index 0000000..91ec219 --- /dev/null +++ b/newdemo.py @@ -0,0 +1,13 @@ +import ids +import apns +from getpass import getpass + + +conn = apns.APNSConnection() +conn.connect() + +username = input("Username: ") +password = getpass("Password: ") +user = ids.IDSUser(conn, username, password) + +print(user.handles) \ No newline at end of file diff --git a/ids.py b/oldids.py similarity index 95% rename from ids.py rename to oldids.py index 950c0ca..b3f133b 100644 --- a/ids.py +++ b/oldids.py @@ -41,16 +41,11 @@ def _send_request( signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) headers = { - "x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - .replace("\n", ""), - "x-id-nonce": b64encode(nonce).decode(), - "x-id-sig": signature, - "x-push-token": push_token, "x-id-self-uri": "mailto:" + username, "User-Agent": USER_AGENT, "x-protocol-version": "1630", } + _add_id_sig(headers, body, bag_key, keypair, push_token) # print(headers) @@ -246,7 +241,7 @@ def _register_request( "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id-0": info["user_id"], } - _add_auth_push_signatures( + _add_auth_push_sig( headers, body, "id-register", auth_key, push_key, push_token, 0 ) @@ -263,24 +258,12 @@ def _register_request( # TODO: Do validation of nested statuses return r - -def mini_cert(cert: str): - return ( - cert.replace("\n", "") - .replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - ) - - -PROTOCOL_VERSION = "1640" - - def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): headers = { "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id": user_id, } - _add_auth_push_signatures( + _add_auth_push_sig( headers, None, "id-get-handles", auth_key, push_key, push_token ) @@ -404,9 +387,15 @@ def _sign_payload( return sig, nonce +def dearmour(cert: str): + return ( + cert.replace("\n", "") + .replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + ) # Add headers for x-push-sig and x-auth-sig stuff -def _add_auth_push_signatures( +def _add_auth_push_sig( headers: dict, body: bytes, bag_key: str, @@ -418,14 +407,27 @@ def _add_auth_push_signatures( push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) headers["x-push-sig"] = push_sig headers["x-push-nonce"] = b64encode(push_nonce) - headers["x-push-cert"] = mini_cert(push_key.cert) + headers["x-push-cert"] = dearmour(push_key.cert) headers["x-push-token"] = push_token auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) auth_postfix = "-" + str(auth_number) if auth_number is not None else "" headers["x-auth-sig" + auth_postfix] = auth_sig headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) - headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert) + headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) + +def _add_id_sig( + headers: dict, + body: bytes, + bag_key: str, + id_key: KeyPair, + push_token: str, +): + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) + headers["x-id-sig"] = id_sig + headers["x-id-nonce"] = b64encode(id_nonce) + headers["x-id-cert"] = dearmour(id_key.cert) + headers["x-push-token"] = push_token if __name__ == "__main__":