From 2e6999056aa9978e5cae44eb59246160e8870118 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 15:36:33 -0400 Subject: [PATCH 1/9] starting the refactor --- albert.py | 4 +- demo.py | 8 ++- development/demo.py | 2 +- gsa.py | 5 +- ids/__init__.py | 37 +++++++++++ ids/_helpers.py | 12 ++++ ids/identity.py | 0 ids/profile.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ ids/query.py | 84 ++++++++++++++++++++++++ ids/signing.py | 117 +++++++++++++++++++++++++++++++++ newdemo.py | 13 ++++ ids.py => oldids.py | 48 +++++++------- 12 files changed, 453 insertions(+), 31 deletions(-) create mode 100644 ids/__init__.py create mode 100644 ids/_helpers.py create mode 100644 ids/identity.py create mode 100644 ids/profile.py create mode 100644 ids/query.py create mode 100644 ids/signing.py create mode 100644 newdemo.py rename ids.py => oldids.py (95%) diff --git a/albert.py b/albert.py index 5a3d330..17b7fd0 100644 --- a/albert.py +++ b/albert.py @@ -68,7 +68,7 @@ def generate_push_cert() -> tuple[str, str]: ) # Sign the activation info - signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1()) + signature = fairplay_key.sign(activation_info, padding.PKCS1v15(), hashes.SHA1()) # type: ignore body = { "ActivationInfoComplete": True, @@ -83,7 +83,7 @@ def generate_push_cert() -> tuple[str, str]: verify=False, ) - protocol = re.search("(.*)", resp.text).group(1) + protocol = re.search("(.*)", resp.text).group(1) # type: ignore protocol = plistlib.loads(protocol.encode("utf-8")) return ( diff --git a/demo.py b/demo.py index 73ebb9e..5e4d04c 100644 --- a/demo.py +++ b/demo.py @@ -2,7 +2,10 @@ import getpass import json import ids -from ids import * +from base64 import b64encode, b64decode +import apns +from ids import signing + # Open config try: @@ -115,8 +118,7 @@ def refresh_ids_cert(): CONFIG["validation_data"] = validation_data print(resp) - ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"]) - ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + ids_cert = signing.armour_cert(resp["services"][0]["users"][0]["cert"]) CONFIG["ids_cert"] = ids_cert diff --git a/development/demo.py b/development/demo.py index 70ff35b..730ae9e 100644 --- a/development/demo.py +++ b/development/demo.py @@ -14,4 +14,4 @@ conn1.filter(["com.apple.madrid"]) # print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"])) -print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1")) +#print(ids.register(conn1, "user_test2@icloud.com", "wowSecure1")) diff --git a/gsa.py b/gsa.py index c8dbfe4..e2dc8da 100644 --- a/gsa.py +++ b/gsa.py @@ -326,8 +326,9 @@ def check_error(r): status = r if status["ec"] != 0: - print(f"Error {status['ec']}: {status['em']}") - return True + raise Exception(f"Error {status['ec']}: {status['em']}") + #print(f"Error {status['ec']}: {status['em']}") + #return True return False diff --git a/ids/__init__.py b/ids/__init__.py new file mode 100644 index 0000000..4c95124 --- /dev/null +++ b/ids/__init__.py @@ -0,0 +1,37 @@ +from base64 import b64encode +import apns +from . import profile +from . import _helpers +#from .profile import _get_auth_cert, _get_auth_token, _get_handles + +class IDSUser: + def _authenticate_for_token( + self, username: str, password: str, factor_callback: callable = None + ): + self.user_id, self._auth_token = profile._get_auth_token( + username, password, factor_callback + ) + + def _authenticate_for_cert(self): + self._auth_keypair = profile._get_auth_cert(self.user_id, self._auth_token) + + # Factor callback will be called if a 2FA code is necessary + def __init__( + self, + push_connection: apns.APNSConnection, + username: str, + password: str, + factor_callback: callable = None, + ): + self.push_connection = push_connection + self._authenticate_for_token(username, password, factor_callback) + self._authenticate_for_cert() + self.handles = profile._get_handles( + b64encode(self.push_connection.token), + self.user_id, + self._auth_keypair, + _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert), + ) + + def __str__(self): + return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" \ No newline at end of file diff --git a/ids/_helpers.py b/ids/_helpers.py new file mode 100644 index 0000000..9f2bc93 --- /dev/null +++ b/ids/_helpers.py @@ -0,0 +1,12 @@ +from collections import namedtuple + +USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" +PROTOCOL_VERSION = "1640" + +# KeyPair is a named tuple that holds a key and a certificate in PEM form +KeyPair = namedtuple("KeyPair", ["key", "cert"]) + +def dearmour(armoured: str) -> str: + import re + # Use a regex to remove the header and footer (generic so it work on more than just certificates) + return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace("\n", "") \ No newline at end of file diff --git a/ids/identity.py b/ids/identity.py new file mode 100644 index 0000000..e69de29 diff --git a/ids/profile.py b/ids/profile.py new file mode 100644 index 0000000..c503ffc --- /dev/null +++ b/ids/profile.py @@ -0,0 +1,154 @@ +import requests +import plistlib +import uuid +import gsa +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID +import random +from base64 import b64decode +from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT +from . import signing + + +def _auth_token_request(username: str, password: str) -> any: + # Turn the PET into an auth token + data = { + "apple-id": username, + "client-id": str(uuid.uuid4()), + "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, + "password": password, + } + data = plistlib.dumps(data) + + r = requests.post( + "https://setup.icloud.com/setup/prefpane/loginDelegates", + auth=(username, password), + data=data, + verify=False, + ) + r = plistlib.loads(r.content) + return r + + +# Gets an IDS auth token for the given username and password +# Will use native Grand Slam on macOS +# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted +# Returns (realm user id, auth token) +def _get_auth_token( + username: str, password: str, factor_gen: callable = None +) -> tuple[str, str]: + from sys import platform + + # if use_gsa: + if platform == "darwin": + g = gsa.authenticate(username, password, gsa.Anisette()) + pet = g["t"]["com.apple.gs.idms.pet"]["token"] + else: + # Make the request without the 2FA code to make the prompt appear + _auth_token_request(username, password) + # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA + # Now make the request with the 2FA code + if factor_gen is None: + pet = password + input("Enter 2FA code: ") + else: + pet = password + factor_gen() + r = _auth_token_request(username, pet) + # print(r) + if "description" in r: + raise Exception(f"Error: {r['description']}") + service_data = r["delegates"]["com.apple.private.ids"]["service-data"] + realm_user_id = service_data["realm-user-id"] + auth_token = service_data["auth-token"] + # print(f"Auth token for {realm_user_id}: {auth_token}") + return realm_user_id, auth_token + + +def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), + ] + ) + ) + .sign(private_key, hashes.SHA256()) + ) + + csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + return ( + csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") + .replace("-----END CERTIFICATE REQUEST-----", "") + .replace("\n", "") + ) + + +# Gets an IDS auth cert for the given user id and auth token +# Returns [private key PEM, certificate PEM] +def _get_auth_cert(user_id, token) -> KeyPair: + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + body = { + "authentication-data": {"auth-token": token}, + "csr": b64decode(_generate_csr(private_key)), + "realm-user-id": user_id, + } + + body = plistlib.dumps(body) + + r = requests.post( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", + data=body, + headers={"x-protocol-version": "1630"}, + verify=False, + ) + r = plistlib.loads(r.content) + if r["status"] != 0: + raise (Exception(f"Failed to get auth cert: {r}")) + cert = x509.load_der_x509_certificate(r["cert"]) + return KeyPair( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + .decode("utf-8") + .strip(), + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), + ) + +def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id": user_id, + #"user-agent": USER_AGENT, + } + import oldids + #headers2 = headers.copy() + #oldids._add_auth_push_sig(headers2, None, "id-get-handles", auth_key, push_key, push_token) + #headers3 = headers.copy() + signing.add_auth_signature( + headers, None, "id-get-handles", auth_key, push_key, push_token + ) + + #for key, value in headers2.items(): + # if headers3[key] != value: + # print(f"Key {key} mismatch: {headers3[key]} != {value}") + + r = requests.get( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", + headers=headers, + verify=False, + ) + + r = plistlib.loads(r.content) + + if not "handles" in r: + raise Exception("No handles in response: " + str(r)) + + return [handle["uri"] for handle in r["handles"]] \ No newline at end of file diff --git a/ids/query.py b/ids/query.py new file mode 100644 index 0000000..443d665 --- /dev/null +++ b/ids/query.py @@ -0,0 +1,84 @@ +import gzip +import plistlib +import random +from base64 import b64encode + +import apns +import bags +from . import USER_AGENT, KeyPair, signing + + +def _send_request( + conn: apns.APNSConnection, + bag_key: str, + topic: str, + body: bytes, + keypair: KeyPair, + username: str, +) -> bytes: + body = gzip.compress(body, mtime=0) + + push_token = b64encode(conn.token).decode() + + # Sign the request + # signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) + + headers = { + "x-id-self-uri": "mailto:" + username, + "User-Agent": USER_AGENT, + "x-protocol-version": "1630", + } + signing.add_id_signature(headers, body, bag_key, keypair, push_token) + + # print(headers) + + msg_id = random.randbytes(16) + + req = { + "cT": "application/x-apple-plist", + "U": msg_id, + "c": 96, + "ua": USER_AGENT, + "u": bags.ids_bag()[bag_key], + "h": headers, + "v": 2, + "b": body, + } + + conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) + # resp = conn.wait_for_packet(0x0A) + + def check_response(x): + if x[0] != 0x0A: + return False + resp_body = apns._get_field(x[1], 3) + if resp_body is None: + return False + resp_body = plistlib.loads(resp_body) + return resp_body["U"] == msg_id + + # Lambda to check if the response is the one we want + # conn.incoming_queue.find(check_response) + payload = conn.incoming_queue.wait_pop_find(check_response) + # conn._send_ack(apns._get_field(payload[1], 4)) + resp = apns._get_field(payload[1], 3) + return plistlib.loads(resp) + + +# Performs an IDS lookup +# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic +# self: the user's email address +# keypair: a KeyPair object containing the user's private key and certificate +# topic: the IDS topic to query +# query: a list of URIs to query +def lookup( + conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] +) -> any: + conn.filter([topic]) + query = {"uris": query} + resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) + # resp = plistlib.loads(resp) + # print(resp) + resp = gzip.decompress(resp["b"]) + resp = plistlib.loads(resp) + return resp diff --git a/ids/signing.py b/ids/signing.py new file mode 100644 index 0000000..7f09170 --- /dev/null +++ b/ids/signing.py @@ -0,0 +1,117 @@ +from ._helpers import dearmour, KeyPair +from datetime import datetime +import random +from base64 import b64encode, b64decode +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID + +# TODO: Move this helper somewhere else +def armour_cert(cert: bytes) -> str: + x509.load_der_x509_certificate(cert) + ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + return ids_cert + +""" +Generates a nonce in this format: +01000001876bd0a2c0e571093967fce3d7 +01 # version + 000001876d008cc5 # unix time + r1r2r3r4r5r6r7r8 # random bytes +""" +def generate_nonce() -> bytes: + return ( + b"\x01" + + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + + random.randbytes(8) + ) + + +import typing + +# Creates a payload from individual parts for signing +def _create_payload( + bag_key: str, + query_string: str, + push_token: typing.Union[str, bytes], + payload: bytes, + nonce: typing.Union[bytes, None] = None, +) -> tuple[bytes, bytes]: + # Generate the nonce + if nonce is None: + nonce = generate_nonce() + + push_token = b64decode(push_token) + + if payload is None: + payload = b"" + + return ( + nonce + + len(bag_key).to_bytes(4, "big") + + bag_key.encode() + + len(query_string).to_bytes(4, "big") + + query_string.encode() + + len(payload).to_bytes(4, "big") + + payload + + len(push_token).to_bytes(4, "big") + + push_token, + nonce, + ) + + +# Returns signature, nonce +def _sign_payload( + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes +) -> tuple[str, bytes]: + # Load the private key + key = serialization.load_pem_private_key( + private_key.encode(), password=None, backend=default_backend() + ) + + payload, nonce = _create_payload(bag_key, query_string, push_token, payload) + + sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore + + sig = b"\x01\x01" + sig + sig = b64encode(sig).decode() + + return sig, nonce + +# Add headers for x-push-sig and x-auth-sig stuff +def add_auth_signature( + headers: dict, + body: bytes, + bag_key: str, + auth_key: KeyPair, + push_key: KeyPair, + push_token: str, + auth_number=None, +): + push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) + headers["x-push-sig"] = push_sig + headers["x-push-nonce"] = b64encode(push_nonce) + headers["x-push-cert"] = dearmour(push_key.cert) + headers["x-push-token"] = push_token + + auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) + auth_postfix = "-" + str(auth_number) if auth_number is not None else "" + headers["x-auth-sig" + auth_postfix] = auth_sig + headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) + headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) + + +def add_id_signature( + headers: dict, + body: bytes, + bag_key: str, + id_key: KeyPair, + push_token: str, +): + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) + headers["x-id-sig"] = id_sig + headers["x-id-nonce"] = b64encode(id_nonce) + headers["x-id-cert"] = dearmour(id_key.cert) + headers["x-push-token"] = push_token diff --git a/newdemo.py b/newdemo.py new file mode 100644 index 0000000..91ec219 --- /dev/null +++ b/newdemo.py @@ -0,0 +1,13 @@ +import ids +import apns +from getpass import getpass + + +conn = apns.APNSConnection() +conn.connect() + +username = input("Username: ") +password = getpass("Password: ") +user = ids.IDSUser(conn, username, password) + +print(user.handles) \ No newline at end of file diff --git a/ids.py b/oldids.py similarity index 95% rename from ids.py rename to oldids.py index 950c0ca..b3f133b 100644 --- a/ids.py +++ b/oldids.py @@ -41,16 +41,11 @@ def _send_request( signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) headers = { - "x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - .replace("\n", ""), - "x-id-nonce": b64encode(nonce).decode(), - "x-id-sig": signature, - "x-push-token": push_token, "x-id-self-uri": "mailto:" + username, "User-Agent": USER_AGENT, "x-protocol-version": "1630", } + _add_id_sig(headers, body, bag_key, keypair, push_token) # print(headers) @@ -246,7 +241,7 @@ def _register_request( "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id-0": info["user_id"], } - _add_auth_push_signatures( + _add_auth_push_sig( headers, body, "id-register", auth_key, push_key, push_token, 0 ) @@ -263,24 +258,12 @@ def _register_request( # TODO: Do validation of nested statuses return r - -def mini_cert(cert: str): - return ( - cert.replace("\n", "") - .replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - ) - - -PROTOCOL_VERSION = "1640" - - def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): headers = { "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id": user_id, } - _add_auth_push_signatures( + _add_auth_push_sig( headers, None, "id-get-handles", auth_key, push_key, push_token ) @@ -404,9 +387,15 @@ def _sign_payload( return sig, nonce +def dearmour(cert: str): + return ( + cert.replace("\n", "") + .replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + ) # Add headers for x-push-sig and x-auth-sig stuff -def _add_auth_push_signatures( +def _add_auth_push_sig( headers: dict, body: bytes, bag_key: str, @@ -418,14 +407,27 @@ def _add_auth_push_signatures( push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) headers["x-push-sig"] = push_sig headers["x-push-nonce"] = b64encode(push_nonce) - headers["x-push-cert"] = mini_cert(push_key.cert) + headers["x-push-cert"] = dearmour(push_key.cert) headers["x-push-token"] = push_token auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) auth_postfix = "-" + str(auth_number) if auth_number is not None else "" headers["x-auth-sig" + auth_postfix] = auth_sig headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) - headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert) + headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) + +def _add_id_sig( + headers: dict, + body: bytes, + bag_key: str, + id_key: KeyPair, + push_token: str, +): + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) + headers["x-id-sig"] = id_sig + headers["x-id-nonce"] = b64encode(id_nonce) + headers["x-id-cert"] = dearmour(id_key.cert) + headers["x-push-token"] = push_token if __name__ == "__main__": From f06b4772f11ff7d0407a6ccf9c3e4e5f26be96ca Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 16:09:28 -0400 Subject: [PATCH 2/9] more refactoring --- ids/__init__.py | 38 ++++++++++++++++++++++++-------- ids/identity.py | 54 +++++++++++++++++++++++++++++++++++++++++++++ newdemo.py | 58 ++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 135 insertions(+), 15 deletions(-) diff --git a/ids/__init__.py b/ids/__init__.py index 4c95124..4ff4d2f 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -1,10 +1,9 @@ from base64 import b64encode import apns -from . import profile -from . import _helpers -#from .profile import _get_auth_cert, _get_auth_token, _get_handles +from . import profile, _helpers, identity class IDSUser: + # Sets self.user_id and self._auth_token def _authenticate_for_token( self, username: str, password: str, factor_callback: callable = None ): @@ -12,6 +11,7 @@ class IDSUser: username, password, factor_callback ) + # Sets self._auth_keypair using self.user_id and self._auth_token def _authenticate_for_cert(self): self._auth_keypair = profile._get_auth_cert(self.user_id, self._auth_token) @@ -19,19 +19,39 @@ class IDSUser: def __init__( self, push_connection: apns.APNSConnection, - username: str, - password: str, - factor_callback: callable = None, ): self.push_connection = push_connection + self._push_keypair = _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert) + + def __str__(self): + return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" + + # Authenticates with a username and password, to create a brand new authentication keypair + def authenticate(self, username: str, password: str, factor_callback: callable = None): self._authenticate_for_token(username, password, factor_callback) self._authenticate_for_cert() self.handles = profile._get_handles( b64encode(self.push_connection.token), self.user_id, self._auth_keypair, - _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert), + self._push_keypair, ) - def __str__(self): - return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" \ No newline at end of file + # Uses an existing authentication keypair + def restore_authentication(self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict): + self._auth_keypair = auth_keypair + self.user_id = user_id + self.handles = handles + + + # This is a separate call so that the user can make sure the first part succeeds before asking for validation data + def register(self, validation_data: str): + resp = identity.register_request( + b64encode(self.push_connection.token), + self.handles, + self.user_id, + self._auth_keypair, + self._push_keypair, + validation_data + ) + print(resp) diff --git a/ids/identity.py b/ids/identity.py index e69de29..5894e14 100644 --- a/ids/identity.py +++ b/ids/identity.py @@ -0,0 +1,54 @@ +from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT +from base64 import b64decode +import plistlib +import requests +from .signing import add_auth_signature + +def register_request( + push_token, handles, uid, auth_key: KeyPair, push_key: KeyPair, validation_data +): + uris = [{"uri": handle} for handle in handles] + + body = { + "hardware-version": "MacBookPro18,3", + "language": "en-US", + "os-version": "macOS,13.2.1,22D68", + "software-version": "22D68", + "services": [ + { + "capabilities": [{"flags": 1, "name": "Messenger", "version": 1}], + "service": "com.apple.madrid", + "users": [ + { + # TODO: Pass ALL URIs from get handles + "uris": uris, + "user-id": uid, + } + ], + } + ], + "validation-data": b64decode(validation_data), + } + + body = plistlib.dumps(body) + + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id-0": uid, + } + add_auth_signature( + headers, body, "id-register", auth_key, push_key, push_token, 0 + ) + + r = requests.post( + "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", + headers=headers, + data=body, + verify=False, + ) + r = plistlib.loads(r.content) + print(f'Response code: {r["status"]}') + if "status" in r and r["status"] == 6004: + raise Exception("Validation data expired!") + # TODO: Do validation of nested statuses + return r \ No newline at end of file diff --git a/newdemo.py b/newdemo.py index 91ec219..4541c0f 100644 --- a/newdemo.py +++ b/newdemo.py @@ -1,13 +1,59 @@ import ids import apns from getpass import getpass +import json +from base64 import b64encode + +def input_multiline(prompt): + print(prompt) + lines = [] + while True: + line = input() + if line == "": + break + lines.append(line) + return "\n".join(lines) + +# Try and load config.json +try: + with open("config.json", "r") as f: + + CONFIG = json.load(f) +except FileNotFoundError: + CONFIG = {} -conn = apns.APNSConnection() -conn.connect() +conn = apns.APNSConnection(CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")) +conn.connect(CONFIG.get("push", {}).get("token")) -username = input("Username: ") -password = getpass("Password: ") -user = ids.IDSUser(conn, username, password) +user = ids.IDSUser(conn) -print(user.handles) \ No newline at end of file +if CONFIG.get("auth", {}).get("cert") is not None: + auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) + user_id = CONFIG["auth"]["user_id"] + handles = CONFIG["auth"]["handles"] + user.restore_authentication(auth_keypair, user_id, handles) +else: + username = input("Username: ") + password = getpass("Password: ") + + user.authenticate(username, password) + +vd = input_multiline("Enter validation data: ") +user.register(vd) + +# Write config.json +CONFIG["auth"] = { + "key": user._auth_keypair.key, + "cert": user._auth_keypair.cert, + "user_id": user.user_id, + "handles": user.handles, +} +CONFIG["push"] = { + "token": b64encode(user.push_connection.token).decode(), + "key": user.push_connection.private_key, + "cert": user.push_connection.cert, +} + +with open("config.json", "w") as f: + json.dump(CONFIG, f, indent=4) \ No newline at end of file From c99826d60eeea0ad47b4f2e833d5db87ac7b83b7 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 17:03:27 -0400 Subject: [PATCH 3/9] remove old stuff, more refactoring --- demo.py | 208 +++++------------------ ids/__init__.py | 31 ++-- ids/_helpers.py | 6 +- ids/identity.py | 35 ++-- ids/profile.py | 25 ++- ids/query.py | 1 + ids/signing.py | 21 ++- newdemo.py | 59 ------- oldids.py | 434 ------------------------------------------------ 9 files changed, 117 insertions(+), 703 deletions(-) delete mode 100644 newdemo.py delete mode 100644 oldids.py diff --git a/demo.py b/demo.py index 5e4d04c..ba21f4c 100644 --- a/demo.py +++ b/demo.py @@ -1,18 +1,9 @@ -import getpass import json +from base64 import b64encode +from getpass import getpass -import ids -from base64 import b64encode, b64decode import apns -from ids import signing - - -# Open config -try: - with open("config.json", "r") as f: - CONFIG = json.load(f) -except FileNotFoundError: - CONFIG = {} +import ids def input_multiline(prompt): @@ -26,167 +17,56 @@ def input_multiline(prompt): return "\n".join(lines) -def refresh_token(): - # If no username is set, prompt for it - if "username" not in CONFIG: - CONFIG["username"] = input("Enter iCloud username: ") - # If no password is set, prompt for it - if "password" not in CONFIG: - CONFIG["password"] = getpass.getpass("Enter iCloud password: ") - # If grandslam authentication is not set, prompt for it - if "use_gsa" not in CONFIG: - CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y" +# Try and load config.json +try: + with open("config.json", "r") as f: - def factor_gen(): - return input("Enter iCloud 2FA code: ") - - CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token( - CONFIG["username"], CONFIG["password"], factor_gen=factor_gen - ) + CONFIG = json.load(f) +except FileNotFoundError: + CONFIG = {} -def refresh_cert(): - CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert( - CONFIG["user_id"], CONFIG["token"] - ) +conn = apns.APNSConnection( + CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") +) +conn.connect(CONFIG.get("push", {}).get("token")) +user = ids.IDSUser(conn) -def create_connection(): - conn = apns.APNSConnection() - token = conn.connect() - # conn.filter(['com.apple.madrid']) - CONFIG["push"] = { - "token": b64encode(token).decode(), - "cert": conn.cert, - "key": conn.private_key, - } - return conn - - -def restore_connection(): - conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"]) - conn.connect(True, b64decode(CONFIG["push"]["token"])) - # conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi']) - return conn - - -def refresh_ids_cert(): - info = { - "uri": "mailto:" + CONFIG["username"], - "user_id": CONFIG["user_id"], - } - - print( - ids._get_handles( - CONFIG["push"]["token"], - CONFIG["user_id"], - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - ) - ) - - resp = None - try: - if "validation_data" in CONFIG: - resp = ids._register_request( - CONFIG["push"]["token"], - info, - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - CONFIG["validation_data"], - ) - except Exception as e: - print(e) - resp = None - - if resp is None: - print( - "Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy." - ) - validation_data = ( - input_multiline("Enter validation data: ") - .replace("\n", "") - .replace(" ", "") - ) - resp = ids._register_request( - CONFIG["push"]["token"], - info, - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - validation_data, - ) - CONFIG["validation_data"] = validation_data - - print(resp) - ids_cert = signing.armour_cert(resp["services"][0]["users"][0]["cert"]) - - CONFIG["ids_cert"] = ids_cert - - -if not "push" in CONFIG: - print("No existing APNs credentials, creating new ones...") - # print("No push conn") - conn = create_connection() +if CONFIG.get("auth", {}).get("cert") is not None: + auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) + user_id = CONFIG["auth"]["user_id"] + handles = CONFIG["auth"]["handles"] + user.restore_authentication(auth_keypair, user_id, handles) else: - print("Restoring APNs credentials...") - conn = restore_connection() -print("Connected to APNs!") + username = input("Username: ") + password = getpass("Password: ") -if not "ids_cert" in CONFIG: - print("No existing IDS certificate, creating new one...") - if not "key" in CONFIG: - print("No existing authentication certificate, creating new one...") - if not "token" in CONFIG: - print("No existing authentication token, creating new one...") - refresh_token() - print("Got authentication token!") - refresh_cert() - print("Got authentication certificate!") - refresh_ids_cert() -print("Got IDS certificate!") + user.authenticate(username, password) -ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"]) +if CONFIG.get("id", {}).get("cert") is not None: + id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) + user.restore_identity(id_keypair) +else: + vd = input_multiline("Enter validation data: ") + user.register(vd) +# Write config.json +CONFIG["id"] = { + "key": user._id_keypair.key, + "cert": user._id_keypair.cert, +} +CONFIG["auth"] = { + "key": user._auth_keypair.key, + "cert": user._auth_keypair.cert, + "user_id": user.user_id, + "handles": user.handles, +} +CONFIG["push"] = { + "token": b64encode(user.push_connection.token).decode(), + "key": user.push_connection.private_key, + "cert": user.push_connection.cert, +} -def lookup(topic: str, users: list[str]): - print(f"Looking up users {users} for topic {topic}...") - resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users) - - # print(resp) - # r = list(resp['results'].values())[0] - for k, v in resp["results"].items(): - print(f"Result for user {k} topic {topic}:") - i = v["identities"] - print(f"IDENTITIES: {len(i)}") - for iden in i: - print("IDENTITY", end=" ") - print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ") - if "client-data" in iden: - print(f"Client Data: {len(iden['client-data'])}") - - else: - print("No client data") - - -# Hack to make sure that the requests and responses match up -# This filter MUST contain all the topics you are looking up -# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing']) -# import time -# print("...waiting for queued messages... (this is a hack)") -# time.sleep(5) # Let the server send us any messages it was holding -# conn.sink() # Dump the messages - -lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"]) -lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"]) - -lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"]) -lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"]) - -lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"]) - -lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"]) - -# time.sleep(4) -# Save config with open("config.json", "w") as f: json.dump(CONFIG, f, indent=4) diff --git a/ids/__init__.py b/ids/__init__.py index 4ff4d2f..ddf1bae 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -1,6 +1,9 @@ from base64 import b64encode + import apns -from . import profile, _helpers, identity + +from . import _helpers, identity, profile + class IDSUser: # Sets self.user_id and self._auth_token @@ -21,13 +24,17 @@ class IDSUser: push_connection: apns.APNSConnection, ): self.push_connection = push_connection - self._push_keypair = _helpers.KeyPair(self.push_connection.private_key, self.push_connection.cert) + self._push_keypair = _helpers.KeyPair( + self.push_connection.private_key, self.push_connection.cert + ) def __str__(self): return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" - + # Authenticates with a username and password, to create a brand new authentication keypair - def authenticate(self, username: str, password: str, factor_callback: callable = None): + def authenticate( + self, username: str, password: str, factor_callback: callable = None + ): self._authenticate_for_token(username, password, factor_callback) self._authenticate_for_cert() self.handles = profile._get_handles( @@ -38,20 +45,24 @@ class IDSUser: ) # Uses an existing authentication keypair - def restore_authentication(self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict): + def restore_authentication( + self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict + ): self._auth_keypair = auth_keypair self.user_id = user_id self.handles = handles - - + # This is a separate call so that the user can make sure the first part succeeds before asking for validation data def register(self, validation_data: str): - resp = identity.register_request( + cert = identity.register( b64encode(self.push_connection.token), self.handles, self.user_id, self._auth_keypair, self._push_keypair, - validation_data + validation_data, ) - print(resp) + self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert) + + def restore_identity(self, id_keypair: _helpers.KeyPair): + self._id_keypair = id_keypair diff --git a/ids/_helpers.py b/ids/_helpers.py index 9f2bc93..a7e0a6d 100644 --- a/ids/_helpers.py +++ b/ids/_helpers.py @@ -6,7 +6,11 @@ PROTOCOL_VERSION = "1640" # KeyPair is a named tuple that holds a key and a certificate in PEM form KeyPair = namedtuple("KeyPair", ["key", "cert"]) + def dearmour(armoured: str) -> str: import re + # Use a regex to remove the header and footer (generic so it work on more than just certificates) - return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace("\n", "") \ No newline at end of file + return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace( + "\n", "" + ) diff --git a/ids/identity.py b/ids/identity.py index 5894e14..a9d0306 100644 --- a/ids/identity.py +++ b/ids/identity.py @@ -1,11 +1,14 @@ -from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT -from base64 import b64decode import plistlib -import requests -from .signing import add_auth_signature +from base64 import b64decode -def register_request( - push_token, handles, uid, auth_key: KeyPair, push_key: KeyPair, validation_data +import requests + +from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair +from .signing import add_auth_signature, armour_cert + + +def register( + push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, validation_data ): uris = [{"uri": handle} for handle in handles] @@ -20,9 +23,8 @@ def register_request( "service": "com.apple.madrid", "users": [ { - # TODO: Pass ALL URIs from get handles "uris": uris, - "user-id": uid, + "user-id": user_id, } ], } @@ -34,11 +36,9 @@ def register_request( headers = { "x-protocol-version": PROTOCOL_VERSION, - "x-auth-user-id-0": uid, + "x-auth-user-id-0": user_id, } - add_auth_signature( - headers, body, "id-register", auth_key, push_key, push_token, 0 - ) + add_auth_signature(headers, body, "id-register", auth_key, push_key, push_token, 0) r = requests.post( "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", @@ -51,4 +51,13 @@ def register_request( if "status" in r and r["status"] == 6004: raise Exception("Validation data expired!") # TODO: Do validation of nested statuses - return r \ No newline at end of file + if "status" in r and r["status"] != 0: + raise Exception(f"Failed to register: {r}") + if not "services" in r: + raise Exception(f"No services in response: {r}") + if not "users" in r["services"][0]: + raise Exception(f"No users in response: {r}") + if not "cert" in r["services"][0]["users"][0]: + raise Exception(f"No cert in response: {r}") + + return armour_cert(r["services"][0]["users"][0]["cert"]) diff --git a/ids/profile.py b/ids/profile.py index c503ffc..7894cf7 100644 --- a/ids/profile.py +++ b/ids/profile.py @@ -1,16 +1,19 @@ -import requests import plistlib +import random import uuid -import gsa +from base64 import b64decode + +import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.x509.oid import NameOID -import random -from base64 import b64decode -from ._helpers import KeyPair, PROTOCOL_VERSION, USER_AGENT + +import gsa + from . import signing +from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair def _auth_token_request(username: str, password: str) -> any: @@ -122,24 +125,16 @@ def _get_auth_cert(user_id, token) -> KeyPair: cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), ) + def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): headers = { "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id": user_id, - #"user-agent": USER_AGENT, } - import oldids - #headers2 = headers.copy() - #oldids._add_auth_push_sig(headers2, None, "id-get-handles", auth_key, push_key, push_token) - #headers3 = headers.copy() signing.add_auth_signature( headers, None, "id-get-handles", auth_key, push_key, push_token ) - #for key, value in headers2.items(): - # if headers3[key] != value: - # print(f"Key {key} mismatch: {headers3[key]} != {value}") - r = requests.get( "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", headers=headers, @@ -151,4 +146,4 @@ def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair) if not "handles" in r: raise Exception("No handles in response: " + str(r)) - return [handle["uri"] for handle in r["handles"]] \ No newline at end of file + return [handle["uri"] for handle in r["handles"]] diff --git a/ids/query.py b/ids/query.py index 443d665..36b991a 100644 --- a/ids/query.py +++ b/ids/query.py @@ -5,6 +5,7 @@ from base64 import b64encode import apns import bags + from . import USER_AGENT, KeyPair, signing diff --git a/ids/signing.py b/ids/signing.py index 7f09170..8b2cdc7 100644 --- a/ids/signing.py +++ b/ids/signing.py @@ -1,18 +1,21 @@ -from ._helpers import dearmour, KeyPair -from datetime import datetime import random -from base64 import b64encode, b64decode +from base64 import b64decode, b64encode +from datetime import datetime + from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.x509.oid import NameOID +from ._helpers import KeyPair, dearmour + + # TODO: Move this helper somewhere else def armour_cert(cert: bytes) -> str: - x509.load_der_x509_certificate(cert) - ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() - return ids_cert + cert = x509.load_der_x509_certificate(cert) + return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + """ Generates a nonce in this format: @@ -21,6 +24,8 @@ Generates a nonce in this format: 000001876d008cc5 # unix time r1r2r3r4r5r6r7r8 # random bytes """ + + def generate_nonce() -> bytes: return ( b"\x01" @@ -31,6 +36,7 @@ def generate_nonce() -> bytes: import typing + # Creates a payload from individual parts for signing def _create_payload( bag_key: str, @@ -73,13 +79,14 @@ def _sign_payload( payload, nonce = _create_payload(bag_key, query_string, push_token, payload) - sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore + sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore sig = b"\x01\x01" + sig sig = b64encode(sig).decode() return sig, nonce + # Add headers for x-push-sig and x-auth-sig stuff def add_auth_signature( headers: dict, diff --git a/newdemo.py b/newdemo.py deleted file mode 100644 index 4541c0f..0000000 --- a/newdemo.py +++ /dev/null @@ -1,59 +0,0 @@ -import ids -import apns -from getpass import getpass -import json -from base64 import b64encode - -def input_multiline(prompt): - print(prompt) - lines = [] - while True: - line = input() - if line == "": - break - lines.append(line) - return "\n".join(lines) - -# Try and load config.json -try: - with open("config.json", "r") as f: - - CONFIG = json.load(f) -except FileNotFoundError: - CONFIG = {} - - -conn = apns.APNSConnection(CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")) -conn.connect(CONFIG.get("push", {}).get("token")) - -user = ids.IDSUser(conn) - -if CONFIG.get("auth", {}).get("cert") is not None: - auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) - user_id = CONFIG["auth"]["user_id"] - handles = CONFIG["auth"]["handles"] - user.restore_authentication(auth_keypair, user_id, handles) -else: - username = input("Username: ") - password = getpass("Password: ") - - user.authenticate(username, password) - -vd = input_multiline("Enter validation data: ") -user.register(vd) - -# Write config.json -CONFIG["auth"] = { - "key": user._auth_keypair.key, - "cert": user._auth_keypair.cert, - "user_id": user.user_id, - "handles": user.handles, -} -CONFIG["push"] = { - "token": b64encode(user.push_connection.token).decode(), - "key": user.push_connection.private_key, - "cert": user.push_connection.cert, -} - -with open("config.json", "w") as f: - json.dump(CONFIG, f, indent=4) \ No newline at end of file diff --git a/oldids.py b/oldids.py deleted file mode 100644 index b3f133b..0000000 --- a/oldids.py +++ /dev/null @@ -1,434 +0,0 @@ -import gzip -import plistlib -import random -import uuid -from base64 import b64decode, b64encode -from collections import namedtuple -from datetime import datetime - -import requests -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import padding, rsa -from cryptography.x509.oid import NameOID - -import apns -import bags -import gsa - -USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" -PROTOCOL_VERSION = "1640" - -KeyPair = namedtuple("KeyPair", ["key", "cert"]) - -# global_key, global_cert = load_keys() - - -def _send_request( - conn: apns.APNSConnection, - bag_key: str, - topic: str, - body: bytes, - keypair: KeyPair, - username: str, -) -> bytes: - body = gzip.compress(body, mtime=0) - - push_token = b64encode(conn.token).decode() - - # Sign the request - signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) - - headers = { - "x-id-self-uri": "mailto:" + username, - "User-Agent": USER_AGENT, - "x-protocol-version": "1630", - } - _add_id_sig(headers, body, bag_key, keypair, push_token) - - # print(headers) - - msg_id = random.randbytes(16) - - req = { - "cT": "application/x-apple-plist", - "U": msg_id, - "c": 96, - "ua": USER_AGENT, - "u": bags.ids_bag()[bag_key], - "h": headers, - "v": 2, - "b": body, - } - - conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) - # resp = conn.wait_for_packet(0x0A) - - def check_response(x): - if x[0] != 0x0A: - return False - resp_body = apns._get_field(x[1], 3) - if resp_body is None: - return False - resp_body = plistlib.loads(resp_body) - return resp_body["U"] == msg_id - - # Lambda to check if the response is the one we want - # conn.incoming_queue.find(check_response) - payload = conn.incoming_queue.wait_pop_find(check_response) - # conn._send_ack(apns._get_field(payload[1], 4)) - resp = apns._get_field(payload[1], 3) - return plistlib.loads(resp) - - -# Performs an IDS lookup -# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic -# self: the user's email address -# keypair: a KeyPair object containing the user's private key and certificate -# topic: the IDS topic to query -# query: a list of URIs to query -def lookup( - conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] -) -> any: - conn.filter([topic]) - query = {"uris": query} - resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) - # resp = plistlib.loads(resp) - # print(resp) - resp = gzip.decompress(resp["b"]) - resp = plistlib.loads(resp) - return resp - - -def _auth_token_request(username: str, password: str) -> any: - # Turn the PET into an auth token - data = { - "apple-id": username, - "client-id": str(uuid.uuid4()), - "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, - "password": password, - } - data = plistlib.dumps(data) - - r = requests.post( - "https://setup.icloud.com/setup/prefpane/loginDelegates", - auth=(username, password), - data=data, - verify=False, - ) - r = plistlib.loads(r.content) - return r - - -# Gets an IDS auth token for the given username and password -# Will use native Grand Slam on macOS -# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted -# Returns (realm user id, auth token) -def _get_auth_token( - username: str, password: str, factor_gen: callable = None -) -> tuple[str, str]: - from sys import platform - - # if use_gsa: - if platform == "darwin": - g = gsa.authenticate(username, password, gsa.Anisette()) - pet = g["t"]["com.apple.gs.idms.pet"]["token"] - else: - # Make the request without the 2FA code to make the prompt appear - _auth_token_request(username, password) - # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA - # Now make the request with the 2FA code - if factor_gen is None: - pet = password + input("Enter 2FA code: ") - else: - pet = password + factor_gen() - r = _auth_token_request(username, pet) - # print(r) - if "description" in r: - raise Exception(f"Error: {r['description']}") - service_data = r["delegates"]["com.apple.private.ids"]["service-data"] - realm_user_id = service_data["realm-user-id"] - auth_token = service_data["auth-token"] - # print(f"Auth token for {realm_user_id}: {auth_token}") - return realm_user_id, auth_token - - -def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: - csr = ( - x509.CertificateSigningRequestBuilder() - .subject_name( - x509.Name( - [ - x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), - ] - ) - ) - .sign(private_key, hashes.SHA256()) - ) - - csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") - return ( - csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") - .replace("-----END CERTIFICATE REQUEST-----", "") - .replace("\n", "") - ) - - -# Gets an IDS auth cert for the given user id and auth token -# Returns [private key PEM, certificate PEM] -def _get_auth_cert(user_id, token) -> KeyPair: - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) - body = { - "authentication-data": {"auth-token": token}, - "csr": b64decode(_generate_csr(private_key)), - "realm-user-id": user_id, - } - - body = plistlib.dumps(body) - - r = requests.post( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", - data=body, - headers={"x-protocol-version": "1630"}, - verify=False, - ) - r = plistlib.loads(r.content) - if r["status"] != 0: - raise (Exception(f"Failed to get auth cert: {r}")) - cert = x509.load_der_x509_certificate(r["cert"]) - return KeyPair( - private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - .decode("utf-8") - .strip(), - cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), - ) - - -def _register_request( - push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data -): - body = { - "hardware-version": "MacBookPro18,3", - "language": "en-US", - "os-version": "macOS,13.2.1,22D68", - "software-version": "22D68", - "services": [ - { - "capabilities": [{"flags": 1, "name": "Messenger", "version": 1}], - "service": "com.apple.madrid", - "users": [ - { - # TODO: Pass ALL URIs from get handles - "uris": [{"uri": info["uri"]}], - "user-id": info["user_id"], - } - ], - } - ], - "validation-data": b64decode(validation_data), - } - - body = plistlib.dumps(body) - - headers = { - "x-protocol-version": PROTOCOL_VERSION, - "x-auth-user-id-0": info["user_id"], - } - _add_auth_push_sig( - headers, body, "id-register", auth_key, push_key, push_token, 0 - ) - - r = requests.post( - "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", - headers=headers, - data=body, - verify=False, - ) - r = plistlib.loads(r.content) - print(f'Response code: {r["status"]}') - if "status" in r and r["status"] == 6004: - raise Exception("Validation data expired!") - # TODO: Do validation of nested statuses - return r - -def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): - headers = { - "x-protocol-version": PROTOCOL_VERSION, - "x-auth-user-id": user_id, - } - _add_auth_push_sig( - headers, None, "id-get-handles", auth_key, push_key, push_token - ) - - r = requests.get( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", - headers=headers, - verify=False, - ) - - r = plistlib.loads(r.content) - - if not "handles" in r: - raise Exception("No handles in response: " + str(r)) - - return [handle["uri"] for handle in r["handles"]] - - -class IDSUser: - def _authenticate_for_token( - self, username: str, password: str, factor_callback: callable = None - ): - self.user_id, self._auth_token = _get_auth_token( - username, password, factor_callback - ) - - def _authenticate_for_cert(self): - self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token) - - # Factor callback will be called if a 2FA code is necessary - def __init__( - self, - push_connection: apns.APNSConnection, - username: str, - password: str, - factor_callback: callable = None, - ): - self.push_connection = push_connection - self._authenticate_for_token(username, password, factor_callback) - self._authenticate_for_cert() - self.handles = _get_handles( - b64encode(self.push_connection.token), - self.user_id, - self._auth_keypair, - KeyPair(self.push_connection.private_key, self.push_connection.cert), - ) - - def __str__(self): - return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" - - -def test(): - import getpass - - conn = apns.APNSConnection() - conn.connect() - username = input("Enter username: ") - password = getpass.getpass("Enter password: ") - user = IDSUser(conn, username, password) - print(user) - - -# SIGNING STUFF - -# Nonce Format: -# 01000001876bd0a2c0e571093967fce3d7 -# 01 # version -# 000001876d008cc5 # unix time -# r1r2r3r4r5r6r7r8 # random bytes -def generate_nonce() -> bytes: - return ( - b"\x01" - + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") - + random.randbytes(8) - ) - - -# Creates a payload from individual parts for signing -def _create_payload( - bag_key: str, - query_string: str, - push_token: str, - payload: bytes, - nonce: bytes = None, -) -> tuple[str, bytes]: - # Generate the nonce - if nonce is None: - nonce = generate_nonce() - push_token = b64decode(push_token) - - if payload is None: - payload = b"" - - return ( - nonce - + len(bag_key).to_bytes(4, "big") - + bag_key.encode() - + len(query_string).to_bytes(4, "big") - + query_string.encode() - + len(payload).to_bytes(4, "big") - + payload - + len(push_token).to_bytes(4, "big") - + push_token, - nonce, - ) - - -# Returns signature, nonce -def _sign_payload( - private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes -) -> tuple[str, bytes]: - # Load the private key - key = serialization.load_pem_private_key( - private_key.encode(), password=None, backend=default_backend() - ) - - payload, nonce = _create_payload(bag_key, query_string, push_token, payload) - sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) - - sig = b"\x01\x01" + sig - sig = b64encode(sig).decode() - - return sig, nonce - -def dearmour(cert: str): - return ( - cert.replace("\n", "") - .replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - ) - -# Add headers for x-push-sig and x-auth-sig stuff -def _add_auth_push_sig( - headers: dict, - body: bytes, - bag_key: str, - auth_key: KeyPair, - push_key: KeyPair, - push_token: str, - auth_number=None, -): - push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) - headers["x-push-sig"] = push_sig - headers["x-push-nonce"] = b64encode(push_nonce) - headers["x-push-cert"] = dearmour(push_key.cert) - headers["x-push-token"] = push_token - - auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) - auth_postfix = "-" + str(auth_number) if auth_number is not None else "" - headers["x-auth-sig" + auth_postfix] = auth_sig - headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) - headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) - -def _add_id_sig( - headers: dict, - body: bytes, - bag_key: str, - id_key: KeyPair, - push_token: str, -): - id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) - headers["x-id-sig"] = id_sig - headers["x-id-nonce"] = b64encode(id_nonce) - headers["x-id-cert"] = dearmour(id_key.cert) - headers["x-push-token"] = push_token - - -if __name__ == "__main__": - test() From 7158532c668ee30f5ab8d77c586bc28fad1ec0ca Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 18:01:32 -0400 Subject: [PATCH 4/9] trying to fix query after refactor --- demo.py | 2 ++ ids/__init__.py | 6 +++++- ids/query.py | 13 ++++++++----- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/demo.py b/demo.py index ba21f4c..98d16ce 100644 --- a/demo.py +++ b/demo.py @@ -51,6 +51,8 @@ else: vd = input_multiline("Enter validation data: ") user.register(vd) +print(user.lookup(["mailto:textgpt@icloud.com"])) + # Write config.json CONFIG["id"] = { "key": user._id_keypair.key, diff --git a/ids/__init__.py b/ids/__init__.py index ddf1bae..fa316a3 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -2,7 +2,7 @@ from base64 import b64encode import apns -from . import _helpers, identity, profile +from . import _helpers, identity, profile, query class IDSUser: @@ -66,3 +66,7 @@ class IDSUser: def restore_identity(self, id_keypair: _helpers.KeyPair): self._id_keypair = id_keypair + + def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any: + return query.lookup(self.push_connection, self.handles[0], self._id_keypair, topic, uris) + diff --git a/ids/query.py b/ids/query.py index 36b991a..9d3bcb1 100644 --- a/ids/query.py +++ b/ids/query.py @@ -6,7 +6,8 @@ from base64 import b64encode import apns import bags -from . import USER_AGENT, KeyPair, signing +from ._helpers import USER_AGENT, KeyPair +from . import signing def _send_request( @@ -15,7 +16,7 @@ def _send_request( topic: str, body: bytes, keypair: KeyPair, - username: str, + self_uri: str, ) -> bytes: body = gzip.compress(body, mtime=0) @@ -25,10 +26,11 @@ def _send_request( # signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) headers = { - "x-id-self-uri": "mailto:" + username, + "x-id-self-uri": self_uri, "User-Agent": USER_AGENT, "x-protocol-version": "1630", } + print(headers) signing.add_id_signature(headers, body, bag_key, keypair, push_token) # print(headers) @@ -46,6 +48,7 @@ def _send_request( "b": body, } + print(req) conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) # resp = conn.wait_for_packet(0x0A) @@ -73,11 +76,11 @@ def _send_request( # topic: the IDS topic to query # query: a list of URIs to query def lookup( - conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] + conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str] ) -> any: conn.filter([topic]) query = {"uris": query} - resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) + resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), id_keypair, self_uri) # resp = plistlib.loads(resp) # print(resp) resp = gzip.decompress(resp["b"]) From d2c3bc2431332273c6866bae1b1eaab6ad7f1085 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 18:43:51 -0400 Subject: [PATCH 5/9] commit before stuff breaks again --- demo.py | 33 +++- ids/query.py | 8 +- olddemo.py | 191 ++++++++++++++++++++++ oldids.py | 436 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 664 insertions(+), 4 deletions(-) create mode 100644 olddemo.py create mode 100644 oldids.py diff --git a/demo.py b/demo.py index 98d16ce..75ac078 100644 --- a/demo.py +++ b/demo.py @@ -1,7 +1,7 @@ import json from base64 import b64encode from getpass import getpass - +from base64 import b64decode import apns import ids @@ -25,11 +25,38 @@ try: except FileNotFoundError: CONFIG = {} +def convert_config(old): + new = {} + new["id"] = { + "key": old["key"], + "cert": old["ids_cert"], + } + new["auth"] = { + "key": old["key"], + "cert": old["auth_cert"], + "user_id": old["user_id"], + "handles": [ + "mailto:user_test2@icloud.com", + ] + #"handles": old["handles"], + } + new["push"] = { + "token": old["push"]["token"], + "key": old["push"]["key"], + "cert": old["push"]["cert"], + } + return new + +CONFIG = convert_config(CONFIG) + conn = apns.APNSConnection( CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") ) -conn.connect(CONFIG.get("push", {}).get("token")) +#print(CONFIG.get("push", {}).get("token")) +print(b64decode(CONFIG.get("push", {}).get("token"))) +conn.connect(True, b64decode(CONFIG.get("push", {}).get("token"))) +print(conn.token) user = ids.IDSUser(conn) @@ -70,5 +97,5 @@ CONFIG["push"] = { "cert": user.push_connection.cert, } -with open("config.json", "w") as f: +with open("config.json.new", "w") as f: json.dump(CONFIG, f, indent=4) diff --git a/ids/query.py b/ids/query.py index 9d3bcb1..a4197d0 100644 --- a/ids/query.py +++ b/ids/query.py @@ -75,7 +75,7 @@ def _send_request( # keypair: a KeyPair object containing the user's private key and certificate # topic: the IDS topic to query # query: a list of URIs to query -def lookup( +def lookup_n( conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str] ) -> any: conn.filter([topic]) @@ -86,3 +86,9 @@ def lookup( resp = gzip.decompress(resp["b"]) resp = plistlib.loads(resp) return resp + +def lookup(conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str]) -> any: + import oldids + #id_keypair = KeyPair("-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEA6fNjLPobeiQEbeDzYResvK2oC9+MsGyog36jo1o7pm8AeIth\nSzZ7caM8ThM/37i9DGJyDsnl6yqg1SxhyW4Fm8Evq2Mm6eYh6YwzRvppQoFqXNQO\nrEgjpQAW+D31V5OvHRprwX6qDVRprNF8gtaGYYjTbQudzYwpzpCIwbUu+1IqfojF\ngzTR/bxfdTbJnlaqWbOjFF7WrSZrP18nGaVkbM3rBS7egRZH1WlG14gO31YNbNg3\ndzOz9hQJHehHfHrSyZam3h6nda8tA7LJzVpTGCo7PJMC4IVyvQf7S2N3BlMJ4cen\nemzaDIOW9b/FCuvENkY2LPuDIT1hQs3pOoWHSQIDAQABAoIBABPAmCLDwh6jnFkf\nmUTdEBlFCy9ncjQyFF83yb6gv3ELpa1HzVDhmnYLe2u3Hdk4eoOpaypa+wXKLVaa\nPu5YEvKl0q3EexRb+QiELQ8k1M7H6PBJ+iwrEhFcCtRuPMDmZ+5L3QWy+U4TTrHH\n5RyR2row6HLoPGxOlXgKhXVfZAZVbgsSG8dbbuoP+U9cCrSU5TH2yIa64Gm7XrvF\n0aEo+J6nMAzw4jUUYY/y8gCU89p5utNDpxXZNva8CO0GpkooZ0nDAOnUjytNpWow\nEXkta9xKBwPQ9FXk4tK0005U6s9lFbKm4HdeypX/teSmhaS3QshENL/zmMysOEpN\nxaIRPMECgYEA8O+h5POMutVufwnonzAq0yWxSDtx4Sj9kUNNflLV0T941TMKIZwp\nQmpBDgbt3ffATjRAdKwHEncHXWhPIf3oA0UgqZFdUEEboIXlNd+6unegGHfrrT/S\n5sOQgN9kyZ/z1IvRVxA9qj3shSFFw4p0gOShObc2NGCmJI7IXc6PumECgYEA+JPz\nCl0l0RCk+lL59YUOe9irhqwHeWo26vsPbnWn8mjN6RB6ZF3NeRFU8KaMf9Zb0eO7\nGnSku97AEgL/UkP1F9imrRI1Ci3jT/vGHyFpR0g8KfhAwZuBZBPavaZ52nW5tiDz\nILzxHJfg8xHXKPGl3T5r7ZzuIxmDPY7bFk6xBekCgYEAwviIQCg+l+qjcjZognmO\nDjQQVG2WaCitmWGnUjRiRuRgOdcFudEPKmmln15IGzmj6yUpi8CyMGUWFqaUcuNv\nX0YPemjh5FHrs2jm5UPZbY/khCh3FUnytz9GrqMYgnjn7fX/P78qx5s4zTrxo51l\nTfC172itepFDoY3R4ueHM8ECgYEAm3MqUhjeRVe7VC//0OJcpGZjHd0G747UuS44\nAEPju1x/KHj9kTZ4AHYuQDBnPKq40RExOOIpArPSOXFWagPFihwaX7E7Khp4RNSW\nmXEzfThXJ4fwNyMgT417BY7ONSfZ82O3p4mA3vi73EYT367+otUeeYHiCmEyCZUE\nvXaIjcECgYEAwYaoKAW8+dpUI8e40jg1FE4eWKo9HC/Gnn2rf0bTMz1qgtH6T9Fj\nvfcM9C8RM0ziXrU255fqqWGBNI3z8dq0mgH/CmU87vV4ldqd6Ej+37EC1drAtX4C\nxPIafLpiKa2aDPcw4FAG+nOGEfYIPbS9WT1Jmz/Qw3EUbNKtt6Ze1Ps=\n-----END RSA PRIVATE KEY-----", "-----BEGIN CERTIFICATE-----\nMIIHOTCCBiGgAwIBAgIQGaPYy+62Ee0Sd7oaf5gYAzANBgkqhkiG9w0BAQUFADBu\nMQswCQYDVQQGEwJVUzETMBEGA1UECgwKQXBwbGUgSW5jLjESMBAGA1UECwwJQXBw\nbGUgSURTMREwDwYDVQQPDAhpZGVudGl0eTEjMCEGA1UEAwwaQXBwbGUgSURTIElk\nZW50aXR5IENBIC0gUjEwHhcNMjMwNTA5MjIwODIzWhcNMjMwNjIzMjIwODU5WjCB\n2zELMAkGA1UEBhMCVVMxEzARBgNVBAoMCkFwcGxlIEluYy4xEjAQBgNVBAsMCU1l\nc3NlbmdlcjEOMAwGA1UEDwwFZHMtaWQxHTAbBgoJkiaJk/IsZAEBDA1EOjIwOTk0\nMzYwOTcxMXQwcgYDVQQFE2tiOjdDMDc4MjI2OTdGRDdGRTA5NDhGN0YzODAxOTJC\nNjZDQTkwNDJBNjEvQTI3QzFCMEM5MjE3OUFBQzk5N0U1Mzc0NUM5Q0JDNEZFMzhB\nNDFEMkQ4OUZFNkNCMzg1MThDREJDMTUwODZCNDCCASIwDQYJKoZIhvcNAQEBBQAD\nggEPADCCAQoCggEBAOnzYyz6G3okBG3g82EXrLytqAvfjLBsqIN+o6NaO6ZvAHiL\nYUs2e3GjPE4TP9+4vQxicg7J5esqoNUsYcluBZvBL6tjJunmIemMM0b6aUKBalzU\nDqxII6UAFvg99VeTrx0aa8F+qg1UaazRfILWhmGI020Lnc2MKc6QiMG1LvtSKn6I\nxYM00f28X3U2yZ5WqlmzoxRe1q0maz9fJxmlZGzN6wUu3oEWR9VpRteIDt9WDWzY\nN3czs/YUCR3oR3x60smWpt4ep3WvLQOyyc1aUxgqOzyTAuCFcr0H+0tjdwZTCeHH\np3ps2gyDlvW/xQrrxDZGNiz7gyE9YULN6TqFh0kCAwEAAaOCA2MwggNfMIIC2wYD\nVR0RBIIC0jCCAs6GHG1haWx0bzp1c2VyX3Rlc3QyQGljbG91ZC5jb22gLAYKKoZI\nhvdjZAYEBAMeAAMAAAACAAAABAAAAAEAAAABAAAAAAAABmgAAAAAoIICLAYKKoZI\nhvdjZAYEBwOCAhwARlVTUACPC3uexqw0O0//dpYLdkkocIFg/GhUJg5qX2F8IJ0Y\nqjx0LiR6qlqFCf1UHqVlqU3LtnTQnYYqG0kNje/DC9C2jC1J5+SGzit94eDfVM63\nUH+UpZQHX1J7NT2xjKQxjbvC9jnWHZMxTBvmwSZqHrrzql+rL840stJpopg335DQ\nsjUig9JgHwVYrxBUHGFMDFONZ4swNbjcOGKFT1KH1VaLAxFNrnL8U7m2h0PSG9Ur\nTXUrQFmLEOl5Jul2LAe0n84WAEwt/u3aZGY9SwQaHFz+64P7gWZpjC/q0ZvjbWiB\nxLc9L/qHm9282RA6e/ibn9C5a944GjNrmTy3FKEc7oL3Ru2XBZ5hlyAVBdTqgg8/\nLmT9SizbZ63Rt5Pct4slButdbecCq7phR46ATpgWLYjOx6NVw68G3cuC3hmXkTVW\ndVcJcikXC4c02YBiNqA2svViz32+QvCzQxvHEajC6+xOXEfFwq58S8/c+7HXJEIx\nnovNGWrcbzpCvSH1GankT1WjG5cQBPvUwnOQ58yvcma1FlQ7NU7JMDgPYqDUhhwZ\nhG9V+LRcGIGzLK9hsZQ39SQjAVqYJ23YPvNl3leaGJaiNgTgjccH6htTI5BBhDdM\nlBUooNEEmbrl0S6NB+OwI/5fWtic2T17J5HEM5mT3u9yC3reurv21hcG/R3rO04N\ni287i7848P039m0/cS0MFiOElmzAQgWgHwYKKoZIhvdjZAYECAMRABmj2MvuthHt\nEne6Gn+YGAOgLwYKKoZIhvdjZAYEBgMhAKJ8GwySF5qsmX5TdFycvE/jikHS2J/m\nyzhRjNvBUIa0MB8GA1UdIwQYMBaAFMZ7ab5JwEEOwMirMjI45D+RQIvaMB0GA1Ud\nDgQWBBSktWY28tqk62vLZOqMfXbkDszx6zAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB\n/wQEAwID+DAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDQYJKoZI\nhvcNAQEFBQADggEBAApIEISe9G6kdCwcWphSpiN1yGUP9WVhJTgUTvUgWl/e7Z1q\n4uVGNb2LsBEHXvI/rGL3qqVqSlt8b+GKqqxCSuL2G5ROoYn9wL/BuNCQJaa/SMqW\nA0Gz3uIA+fd/G+iYH31SP62DH/o6u7ctdG+pi5gjSCiBQcc8jTuOvWhSea6SfVC3\nqW7BBaxTSal/RWNll7A3RBCZS9vK7FZihDomGGH37YDNONTTr41k6FIH65X3pzy0\nFk5Jn/N/Ymhy5zcNPG1TBoXX2ZRWvfxuqMYP3+lfL15STJGQ65fnQNSSS6GkCGVm\nn3R7QDyy73xSTtEiBg28PUw/s2t+OR4lFuQr+KI=\n-----END CERTIFICATE-----") + + return oldids.lookup(conn, self_uri, id_keypair, topic, query) \ No newline at end of file diff --git a/olddemo.py b/olddemo.py new file mode 100644 index 0000000..43888a7 --- /dev/null +++ b/olddemo.py @@ -0,0 +1,191 @@ +import getpass +import json + +import oldids as ids +from oldids import * +#from base64 import b64encode, b64decode + +# Open config +try: + with open("config.json", "r") as f: + CONFIG = json.load(f) +except FileNotFoundError: + CONFIG = {} + + +def input_multiline(prompt): + print(prompt) + lines = [] + while True: + line = input() + if line == "": + break + lines.append(line) + return "\n".join(lines) + + +def refresh_token(): + # If no username is set, prompt for it + if "username" not in CONFIG: + CONFIG["username"] = input("Enter iCloud username: ") + # If no password is set, prompt for it + if "password" not in CONFIG: + CONFIG["password"] = getpass.getpass("Enter iCloud password: ") + # If grandslam authentication is not set, prompt for it + if "use_gsa" not in CONFIG: + CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y" + + def factor_gen(): + return input("Enter iCloud 2FA code: ") + + CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token( + CONFIG["username"], CONFIG["password"], factor_gen=factor_gen + ) + + +def refresh_cert(): + CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert( + CONFIG["user_id"], CONFIG["token"] + ) + + +def create_connection(): + conn = apns.APNSConnection() + token = conn.connect() + # conn.filter(['com.apple.madrid']) + CONFIG["push"] = { + "token": b64encode(token).decode(), + "cert": conn.cert, + "key": conn.private_key, + } + return conn + + +def restore_connection(): + conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"]) + conn.connect(True, b64decode(CONFIG["push"]["token"])) + # conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi']) + return conn + + +def refresh_ids_cert(): + info = { + "uri": "mailto:" + CONFIG["username"], + "user_id": CONFIG["user_id"], + } + + print( + ids._get_handles( + CONFIG["push"]["token"], + CONFIG["user_id"], + ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), + ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), + ) + ) + + resp = None + try: + if "validation_data" in CONFIG: + resp = ids._register_request( + CONFIG["push"]["token"], + info, + ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), + ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), + CONFIG["validation_data"], + ) + except Exception as e: + print(e) + resp = None + + if resp is None: + print( + "Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy." + ) + validation_data = ( + input_multiline("Enter validation data: ") + .replace("\n", "") + .replace(" ", "") + ) + resp = ids._register_request( + CONFIG["push"]["token"], + info, + ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), + ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), + validation_data, + ) + CONFIG["validation_data"] = validation_data + + print(resp) + ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"]) + ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() + + CONFIG["ids_cert"] = ids_cert + + +if not "push" in CONFIG: + print("No existing APNs credentials, creating new ones...") + # print("No push conn") + conn = create_connection() +else: + print("Restoring APNs credentials...") + conn = restore_connection() +print("Connected to APNs!") + +if not "ids_cert" in CONFIG: + print("No existing IDS certificate, creating new one...") + if not "key" in CONFIG: + print("No existing authentication certificate, creating new one...") + if not "token" in CONFIG: + print("No existing authentication token, creating new one...") + refresh_token() + print("Got authentication token!") + refresh_cert() + print("Got authentication certificate!") + refresh_ids_cert() +print("Got IDS certificate!") + +ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"]) + + +def lookup(topic: str, users: list[str]): + print(f"Looking up users {users} for topic {topic}...") + resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users) + + print(resp) + # r = list(resp['results'].values())[0] + for k, v in resp["results"].items(): + print(f"Result for user {k} topic {topic}:") + i = v["identities"] + print(f"IDENTITIES: {len(i)}") + for iden in i: + print("IDENTITY", end=" ") + print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ") + if "client-data" in iden: + print(f"Client Data: {len(iden['client-data'])}") + + else: + print("No client data") + + +# Hack to make sure that the requests and responses match up +# This filter MUST contain all the topics you are looking up +# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing']) +# import time +# print("...waiting for queued messages... (this is a hack)") +# time.sleep(5) # Let the server send us any messages it was holding +# conn.sink() # Dump the messages + +lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"]) +lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"]) + +lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"]) +lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"]) + +lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"]) + +lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"]) + +# time.sleep(4) +# Save config +with open("config.json", "w") as f: + json.dump(CONFIG, f, indent=4) \ No newline at end of file diff --git a/oldids.py b/oldids.py new file mode 100644 index 0000000..1d806e2 --- /dev/null +++ b/oldids.py @@ -0,0 +1,436 @@ +import gzip +import plistlib +import random +import uuid +from base64 import b64decode, b64encode +from collections import namedtuple +from datetime import datetime + +import requests +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID + +import apns +import bags +import gsa + +USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" +PROTOCOL_VERSION = "1640" + +KeyPair = namedtuple("KeyPair", ["key", "cert"]) + +# global_key, global_cert = load_keys() + + +def _send_request( + conn: apns.APNSConnection, + bag_key: str, + topic: str, + body: bytes, + keypair: KeyPair, + username: str, +) -> bytes: + #print(body) + print(bag_key, topic, body, keypair, username) + body = gzip.compress(body, mtime=0) + + push_token = b64encode(conn.token).decode() + + # Sign the request + signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) + + headers = { + "x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + .replace("\n", ""), + "x-id-nonce": b64encode(nonce).decode(), + "x-id-sig": signature, + "x-push-token": push_token, + "x-id-self-uri": "mailto:user_test2@icloud.com", + "User-Agent": USER_AGENT, + "x-protocol-version": "1640", + } + + # print(headers) + + msg_id = random.randbytes(16) + + req = { + "cT": "application/x-apple-plist", + "U": msg_id, + "c": 96, + "ua": USER_AGENT, + "u": bags.ids_bag()[bag_key], + "h": headers, + "v": 2, + "b": body, + } + print(req) + + conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) + # resp = conn.wait_for_packet(0x0A) + + def check_response(x): + if x[0] != 0x0A: + return False + resp_body = apns._get_field(x[1], 3) + if resp_body is None: + return False + resp_body = plistlib.loads(resp_body) + return resp_body["U"] == msg_id + + # Lambda to check if the response is the one we want + # conn.incoming_queue.find(check_response) + payload = conn.incoming_queue.wait_pop_find(check_response) + # conn._send_ack(apns._get_field(payload[1], 4)) + resp = apns._get_field(payload[1], 3) + return plistlib.loads(resp) + + +# Performs an IDS lookup +# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic +# self: the user's email address +# keypair: a KeyPair object containing the user's private key and certificate +# topic: the IDS topic to query +# query: a list of URIs to query +def lookup( + conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] +) -> any: + conn.filter(["com.apple.madrid"]) + query = {"uris": query} + resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) + # resp = plistlib.loads(resp) + # print(resp) + resp = gzip.decompress(resp["b"]) + resp = plistlib.loads(resp) + return resp + + +def _auth_token_request(username: str, password: str) -> any: + # Turn the PET into an auth token + data = { + "apple-id": username, + "client-id": str(uuid.uuid4()), + "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, + "password": password, + } + data = plistlib.dumps(data) + + r = requests.post( + "https://setup.icloud.com/setup/prefpane/loginDelegates", + auth=(username, password), + data=data, + verify=False, + ) + r = plistlib.loads(r.content) + return r + + +# Gets an IDS auth token for the given username and password +# Will use native Grand Slam on macOS +# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted +# Returns (realm user id, auth token) +def _get_auth_token( + username: str, password: str, factor_gen: callable = None +) -> tuple[str, str]: + from sys import platform + + # if use_gsa: + if platform == "darwin": + g = gsa.authenticate(username, password, gsa.Anisette()) + pet = g["t"]["com.apple.gs.idms.pet"]["token"] + else: + # Make the request without the 2FA code to make the prompt appear + _auth_token_request(username, password) + # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA + # Now make the request with the 2FA code + if factor_gen is None: + pet = password + input("Enter 2FA code: ") + else: + pet = password + factor_gen() + r = _auth_token_request(username, pet) + # print(r) + if "description" in r: + raise Exception(f"Error: {r['description']}") + service_data = r["delegates"]["com.apple.private.ids"]["service-data"] + realm_user_id = service_data["realm-user-id"] + auth_token = service_data["auth-token"] + # print(f"Auth token for {realm_user_id}: {auth_token}") + return realm_user_id, auth_token + + +def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), + ] + ) + ) + .sign(private_key, hashes.SHA256()) + ) + + csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + return ( + csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") + .replace("-----END CERTIFICATE REQUEST-----", "") + .replace("\n", "") + ) + + +# Gets an IDS auth cert for the given user id and auth token +# Returns [private key PEM, certificate PEM] +def _get_auth_cert(user_id, token) -> KeyPair: + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + body = { + "authentication-data": {"auth-token": token}, + "csr": b64decode(_generate_csr(private_key)), + "realm-user-id": user_id, + } + + body = plistlib.dumps(body) + + r = requests.post( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", + data=body, + headers={"x-protocol-version": "1630"}, + verify=False, + ) + r = plistlib.loads(r.content) + if r["status"] != 0: + raise (Exception(f"Failed to get auth cert: {r}")) + cert = x509.load_der_x509_certificate(r["cert"]) + return KeyPair( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + .decode("utf-8") + .strip(), + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), + ) + + +def _register_request( + push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data +): + body = { + "hardware-version": "MacBookPro18,3", + "language": "en-US", + "os-version": "macOS,13.2.1,22D68", + "software-version": "22D68", + "services": [ + { + "capabilities": [{"flags": 1, "name": "Messenger", "version": 1}], + "service": "com.apple.madrid", + "users": [ + { + # TODO: Pass ALL URIs from get handles + "uris": [{"uri": info["uri"]}], + "user-id": info["user_id"], + } + ], + } + ], + "validation-data": b64decode(validation_data), + } + + body = plistlib.dumps(body) + + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id-0": info["user_id"], + } + _add_auth_push_signatures( + headers, body, "id-register", auth_key, push_key, push_token, 0 + ) + + r = requests.post( + "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", + headers=headers, + data=body, + verify=False, + ) + r = plistlib.loads(r.content) + print(f'Response code: {r["status"]}') + if "status" in r and r["status"] == 6004: + raise Exception("Validation data expired!") + # TODO: Do validation of nested statuses + return r + + +def mini_cert(cert: str): + return ( + cert.replace("\n", "") + .replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + ) + + +PROTOCOL_VERSION = "1640" + + +def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id": user_id, + } + _add_auth_push_signatures( + headers, None, "id-get-handles", auth_key, push_key, push_token + ) + + r = requests.get( + "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", + headers=headers, + verify=False, + ) + + r = plistlib.loads(r.content) + + if not "handles" in r: + raise Exception("No handles in response: " + str(r)) + + return [handle["uri"] for handle in r["handles"]] + + +class IDSUser: + def _authenticate_for_token( + self, username: str, password: str, factor_callback: callable = None + ): + self.user_id, self._auth_token = _get_auth_token( + username, password, factor_callback + ) + + def _authenticate_for_cert(self): + self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token) + + # Factor callback will be called if a 2FA code is necessary + def __init__( + self, + push_connection: apns.APNSConnection, + username: str, + password: str, + factor_callback: callable = None, + ): + self.push_connection = push_connection + self._authenticate_for_token(username, password, factor_callback) + self._authenticate_for_cert() + self.handles = _get_handles( + b64encode(self.push_connection.token), + self.user_id, + self._auth_keypair, + KeyPair(self.push_connection.private_key, self.push_connection.cert), + ) + + def __str__(self): + return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" + + +def test(): + import getpass + + conn = apns.APNSConnection() + conn.connect() + username = input("Enter username: ") + password = getpass.getpass("Enter password: ") + user = IDSUser(conn, username, password) + print(user) + + +# SIGNING STUFF + +# Nonce Format: +# 01000001876bd0a2c0e571093967fce3d7 +# 01 # version +# 000001876d008cc5 # unix time +# r1r2r3r4r5r6r7r8 # random bytes +def generate_nonce() -> bytes: + return ( + b"\x01" + + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + + random.randbytes(8) + ) + + +# Creates a payload from individual parts for signing +def _create_payload( + bag_key: str, + query_string: str, + push_token: str, + payload: bytes, + nonce: bytes = None, +) -> tuple[str, bytes]: + # Generate the nonce + if nonce is None: + nonce = generate_nonce() + print(push_token) + push_token = b64decode(push_token) + + if payload is None: + payload = b"" + + return ( + nonce + + len(bag_key).to_bytes(4, "big") + + bag_key.encode() + + len(query_string).to_bytes(4, "big") + + query_string.encode() + + len(payload).to_bytes(4, "big") + + payload + + len(push_token).to_bytes(4, "big") + + push_token, + nonce, + ) + + +# Returns signature, nonce +def _sign_payload( + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes +) -> tuple[str, bytes]: + # Load the private key + key = serialization.load_pem_private_key( + private_key.encode(), password=None, backend=default_backend() + ) + + payload, nonce = _create_payload(bag_key, query_string, push_token, payload) + sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) + + sig = b"\x01\x01" + sig + sig = b64encode(sig).decode() + + return sig, nonce + + +# Add headers for x-push-sig and x-auth-sig stuff +def _add_auth_push_signatures( + headers: dict, + body: bytes, + bag_key: str, + auth_key: KeyPair, + push_key: KeyPair, + push_token: str, + auth_number=None, +): + push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) + headers["x-push-sig"] = push_sig + headers["x-push-nonce"] = b64encode(push_nonce) + headers["x-push-cert"] = mini_cert(push_key.cert) + headers["x-push-token"] = push_token + + auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) + auth_postfix = "-" + str(auth_number) if auth_number is not None else "" + headers["x-auth-sig" + auth_postfix] = auth_sig + headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) + headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert) + + +if __name__ == "__main__": + test() From 7acc1da5b8733c02a1f863e545759d678d58b90d Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 18:48:44 -0400 Subject: [PATCH 6/9] yay should be able to fix up again --- demo.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/demo.py b/demo.py index 75ac078..f477d3c 100644 --- a/demo.py +++ b/demo.py @@ -47,17 +47,21 @@ def convert_config(old): } return new -CONFIG = convert_config(CONFIG) +# Uncomment this to change from an old config.json to a new one +#CONFIG = convert_config(CONFIG) conn = apns.APNSConnection( CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") ) -#print(CONFIG.get("push", {}).get("token")) -print(b64decode(CONFIG.get("push", {}).get("token"))) -conn.connect(True, b64decode(CONFIG.get("push", {}).get("token"))) -print(conn.token) +def safe_b64decode(s): + try: + return b64decode(s) + except: + return None +conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token"))) +#print(b64encode(conn.token).decode()) user = ids.IDSUser(conn) if CONFIG.get("auth", {}).get("cert") is not None: @@ -97,5 +101,5 @@ CONFIG["push"] = { "cert": user.push_connection.cert, } -with open("config.json.new", "w") as f: +with open("config.json", "w") as f: json.dump(CONFIG, f, indent=4) From 7aa42d2b1478e418e96d7d1e0904ac8761db0985 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 19:29:17 -0400 Subject: [PATCH 7/9] yay refactoring that works --- ids/__init__.py | 2 +- ids/query.py | 55 ++---- ids/signing.py | 9 +- olddemo.py | 191 --------------------- oldids.py | 436 ------------------------------------------------ 5 files changed, 19 insertions(+), 674 deletions(-) delete mode 100644 olddemo.py delete mode 100644 oldids.py diff --git a/ids/__init__.py b/ids/__init__.py index fa316a3..12ab2e3 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -68,5 +68,5 @@ class IDSUser: self._id_keypair = id_keypair def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any: - return query.lookup(self.push_connection, self.handles[0], self._id_keypair, topic, uris) + return query.lookup(self.push_connection, self.handles[0], self._id_keypair, uris, topic) diff --git a/ids/query.py b/ids/query.py index a4197d0..b6005a0 100644 --- a/ids/query.py +++ b/ids/query.py @@ -10,30 +10,27 @@ from ._helpers import USER_AGENT, KeyPair from . import signing -def _send_request( +def lookup( conn: apns.APNSConnection, - bag_key: str, - topic: str, - body: bytes, - keypair: KeyPair, self_uri: str, + id_keypair: KeyPair, + query: list[str], + topic, ) -> bytes: + BAG_KEY = "id-query" + conn.filter([topic]) + body = {"uris": query} + body = plistlib.dumps(body) + body = gzip.compress(body, mtime=0) push_token = b64encode(conn.token).decode() - # Sign the request - # signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) - headers = { "x-id-self-uri": self_uri, - "User-Agent": USER_AGENT, "x-protocol-version": "1630", } - print(headers) - signing.add_id_signature(headers, body, bag_key, keypair, push_token) - - # print(headers) + signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token, None) msg_id = random.randbytes(16) @@ -41,16 +38,14 @@ def _send_request( "cT": "application/x-apple-plist", "U": msg_id, "c": 96, - "ua": USER_AGENT, - "u": bags.ids_bag()[bag_key], + "u": bags.ids_bag()[BAG_KEY], "h": headers, "v": 2, "b": body, } - print(req) + conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) - # resp = conn.wait_for_packet(0x0A) def check_response(x): if x[0] != 0x0A: @@ -62,33 +57,9 @@ def _send_request( return resp_body["U"] == msg_id # Lambda to check if the response is the one we want - # conn.incoming_queue.find(check_response) payload = conn.incoming_queue.wait_pop_find(check_response) - # conn._send_ack(apns._get_field(payload[1], 4)) resp = apns._get_field(payload[1], 3) - return plistlib.loads(resp) - - -# Performs an IDS lookup -# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic -# self: the user's email address -# keypair: a KeyPair object containing the user's private key and certificate -# topic: the IDS topic to query -# query: a list of URIs to query -def lookup_n( - conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str] -) -> any: - conn.filter([topic]) - query = {"uris": query} - resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), id_keypair, self_uri) - # resp = plistlib.loads(resp) - # print(resp) + resp = plistlib.loads(resp) resp = gzip.decompress(resp["b"]) resp = plistlib.loads(resp) return resp - -def lookup(conn: apns.APNSConnection, self_uri: str, id_keypair: KeyPair, topic: str, query: list[str]) -> any: - import oldids - #id_keypair = KeyPair("-----BEGIN RSA PRIVATE KEY-----\nMIIEpQIBAAKCAQEA6fNjLPobeiQEbeDzYResvK2oC9+MsGyog36jo1o7pm8AeIth\nSzZ7caM8ThM/37i9DGJyDsnl6yqg1SxhyW4Fm8Evq2Mm6eYh6YwzRvppQoFqXNQO\nrEgjpQAW+D31V5OvHRprwX6qDVRprNF8gtaGYYjTbQudzYwpzpCIwbUu+1IqfojF\ngzTR/bxfdTbJnlaqWbOjFF7WrSZrP18nGaVkbM3rBS7egRZH1WlG14gO31YNbNg3\ndzOz9hQJHehHfHrSyZam3h6nda8tA7LJzVpTGCo7PJMC4IVyvQf7S2N3BlMJ4cen\nemzaDIOW9b/FCuvENkY2LPuDIT1hQs3pOoWHSQIDAQABAoIBABPAmCLDwh6jnFkf\nmUTdEBlFCy9ncjQyFF83yb6gv3ELpa1HzVDhmnYLe2u3Hdk4eoOpaypa+wXKLVaa\nPu5YEvKl0q3EexRb+QiELQ8k1M7H6PBJ+iwrEhFcCtRuPMDmZ+5L3QWy+U4TTrHH\n5RyR2row6HLoPGxOlXgKhXVfZAZVbgsSG8dbbuoP+U9cCrSU5TH2yIa64Gm7XrvF\n0aEo+J6nMAzw4jUUYY/y8gCU89p5utNDpxXZNva8CO0GpkooZ0nDAOnUjytNpWow\nEXkta9xKBwPQ9FXk4tK0005U6s9lFbKm4HdeypX/teSmhaS3QshENL/zmMysOEpN\nxaIRPMECgYEA8O+h5POMutVufwnonzAq0yWxSDtx4Sj9kUNNflLV0T941TMKIZwp\nQmpBDgbt3ffATjRAdKwHEncHXWhPIf3oA0UgqZFdUEEboIXlNd+6unegGHfrrT/S\n5sOQgN9kyZ/z1IvRVxA9qj3shSFFw4p0gOShObc2NGCmJI7IXc6PumECgYEA+JPz\nCl0l0RCk+lL59YUOe9irhqwHeWo26vsPbnWn8mjN6RB6ZF3NeRFU8KaMf9Zb0eO7\nGnSku97AEgL/UkP1F9imrRI1Ci3jT/vGHyFpR0g8KfhAwZuBZBPavaZ52nW5tiDz\nILzxHJfg8xHXKPGl3T5r7ZzuIxmDPY7bFk6xBekCgYEAwviIQCg+l+qjcjZognmO\nDjQQVG2WaCitmWGnUjRiRuRgOdcFudEPKmmln15IGzmj6yUpi8CyMGUWFqaUcuNv\nX0YPemjh5FHrs2jm5UPZbY/khCh3FUnytz9GrqMYgnjn7fX/P78qx5s4zTrxo51l\nTfC172itepFDoY3R4ueHM8ECgYEAm3MqUhjeRVe7VC//0OJcpGZjHd0G747UuS44\nAEPju1x/KHj9kTZ4AHYuQDBnPKq40RExOOIpArPSOXFWagPFihwaX7E7Khp4RNSW\nmXEzfThXJ4fwNyMgT417BY7ONSfZ82O3p4mA3vi73EYT367+otUeeYHiCmEyCZUE\nvXaIjcECgYEAwYaoKAW8+dpUI8e40jg1FE4eWKo9HC/Gnn2rf0bTMz1qgtH6T9Fj\nvfcM9C8RM0ziXrU255fqqWGBNI3z8dq0mgH/CmU87vV4ldqd6Ej+37EC1drAtX4C\nxPIafLpiKa2aDPcw4FAG+nOGEfYIPbS9WT1Jmz/Qw3EUbNKtt6Ze1Ps=\n-----END RSA PRIVATE KEY-----", "-----BEGIN CERTIFICATE-----\nMIIHOTCCBiGgAwIBAgIQGaPYy+62Ee0Sd7oaf5gYAzANBgkqhkiG9w0BAQUFADBu\nMQswCQYDVQQGEwJVUzETMBEGA1UECgwKQXBwbGUgSW5jLjESMBAGA1UECwwJQXBw\nbGUgSURTMREwDwYDVQQPDAhpZGVudGl0eTEjMCEGA1UEAwwaQXBwbGUgSURTIElk\nZW50aXR5IENBIC0gUjEwHhcNMjMwNTA5MjIwODIzWhcNMjMwNjIzMjIwODU5WjCB\n2zELMAkGA1UEBhMCVVMxEzARBgNVBAoMCkFwcGxlIEluYy4xEjAQBgNVBAsMCU1l\nc3NlbmdlcjEOMAwGA1UEDwwFZHMtaWQxHTAbBgoJkiaJk/IsZAEBDA1EOjIwOTk0\nMzYwOTcxMXQwcgYDVQQFE2tiOjdDMDc4MjI2OTdGRDdGRTA5NDhGN0YzODAxOTJC\nNjZDQTkwNDJBNjEvQTI3QzFCMEM5MjE3OUFBQzk5N0U1Mzc0NUM5Q0JDNEZFMzhB\nNDFEMkQ4OUZFNkNCMzg1MThDREJDMTUwODZCNDCCASIwDQYJKoZIhvcNAQEBBQAD\nggEPADCCAQoCggEBAOnzYyz6G3okBG3g82EXrLytqAvfjLBsqIN+o6NaO6ZvAHiL\nYUs2e3GjPE4TP9+4vQxicg7J5esqoNUsYcluBZvBL6tjJunmIemMM0b6aUKBalzU\nDqxII6UAFvg99VeTrx0aa8F+qg1UaazRfILWhmGI020Lnc2MKc6QiMG1LvtSKn6I\nxYM00f28X3U2yZ5WqlmzoxRe1q0maz9fJxmlZGzN6wUu3oEWR9VpRteIDt9WDWzY\nN3czs/YUCR3oR3x60smWpt4ep3WvLQOyyc1aUxgqOzyTAuCFcr0H+0tjdwZTCeHH\np3ps2gyDlvW/xQrrxDZGNiz7gyE9YULN6TqFh0kCAwEAAaOCA2MwggNfMIIC2wYD\nVR0RBIIC0jCCAs6GHG1haWx0bzp1c2VyX3Rlc3QyQGljbG91ZC5jb22gLAYKKoZI\nhvdjZAYEBAMeAAMAAAACAAAABAAAAAEAAAABAAAAAAAABmgAAAAAoIICLAYKKoZI\nhvdjZAYEBwOCAhwARlVTUACPC3uexqw0O0//dpYLdkkocIFg/GhUJg5qX2F8IJ0Y\nqjx0LiR6qlqFCf1UHqVlqU3LtnTQnYYqG0kNje/DC9C2jC1J5+SGzit94eDfVM63\nUH+UpZQHX1J7NT2xjKQxjbvC9jnWHZMxTBvmwSZqHrrzql+rL840stJpopg335DQ\nsjUig9JgHwVYrxBUHGFMDFONZ4swNbjcOGKFT1KH1VaLAxFNrnL8U7m2h0PSG9Ur\nTXUrQFmLEOl5Jul2LAe0n84WAEwt/u3aZGY9SwQaHFz+64P7gWZpjC/q0ZvjbWiB\nxLc9L/qHm9282RA6e/ibn9C5a944GjNrmTy3FKEc7oL3Ru2XBZ5hlyAVBdTqgg8/\nLmT9SizbZ63Rt5Pct4slButdbecCq7phR46ATpgWLYjOx6NVw68G3cuC3hmXkTVW\ndVcJcikXC4c02YBiNqA2svViz32+QvCzQxvHEajC6+xOXEfFwq58S8/c+7HXJEIx\nnovNGWrcbzpCvSH1GankT1WjG5cQBPvUwnOQ58yvcma1FlQ7NU7JMDgPYqDUhhwZ\nhG9V+LRcGIGzLK9hsZQ39SQjAVqYJ23YPvNl3leaGJaiNgTgjccH6htTI5BBhDdM\nlBUooNEEmbrl0S6NB+OwI/5fWtic2T17J5HEM5mT3u9yC3reurv21hcG/R3rO04N\ni287i7848P039m0/cS0MFiOElmzAQgWgHwYKKoZIhvdjZAYECAMRABmj2MvuthHt\nEne6Gn+YGAOgLwYKKoZIhvdjZAYEBgMhAKJ8GwySF5qsmX5TdFycvE/jikHS2J/m\nyzhRjNvBUIa0MB8GA1UdIwQYMBaAFMZ7ab5JwEEOwMirMjI45D+RQIvaMB0GA1Ud\nDgQWBBSktWY28tqk62vLZOqMfXbkDszx6zAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB\n/wQEAwID+DAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDQYJKoZI\nhvcNAQEFBQADggEBAApIEISe9G6kdCwcWphSpiN1yGUP9WVhJTgUTvUgWl/e7Z1q\n4uVGNb2LsBEHXvI/rGL3qqVqSlt8b+GKqqxCSuL2G5ROoYn9wL/BuNCQJaa/SMqW\nA0Gz3uIA+fd/G+iYH31SP62DH/o6u7ctdG+pi5gjSCiBQcc8jTuOvWhSea6SfVC3\nqW7BBaxTSal/RWNll7A3RBCZS9vK7FZihDomGGH37YDNONTTr41k6FIH65X3pzy0\nFk5Jn/N/Ymhy5zcNPG1TBoXX2ZRWvfxuqMYP3+lfL15STJGQ65fnQNSSS6GkCGVm\nn3R7QDyy73xSTtEiBg28PUw/s2t+OR4lFuQr+KI=\n-----END CERTIFICATE-----") - - return oldids.lookup(conn, self_uri, id_keypair, topic, query) \ No newline at end of file diff --git a/ids/signing.py b/ids/signing.py index 8b2cdc7..c5f24ce 100644 --- a/ids/signing.py +++ b/ids/signing.py @@ -70,14 +70,14 @@ def _create_payload( # Returns signature, nonce def _sign_payload( - private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes, nonce = None ) -> tuple[str, bytes]: # Load the private key key = serialization.load_pem_private_key( private_key.encode(), password=None, backend=default_backend() ) - payload, nonce = _create_payload(bag_key, query_string, push_token, payload) + payload, nonce = _create_payload(bag_key, query_string, push_token, payload, nonce) sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore @@ -116,9 +116,10 @@ def add_id_signature( bag_key: str, id_key: KeyPair, push_token: str, + nonce=None, ): - id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body) + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body, nonce) headers["x-id-sig"] = id_sig - headers["x-id-nonce"] = b64encode(id_nonce) + headers["x-id-nonce"] = b64encode(id_nonce).decode() headers["x-id-cert"] = dearmour(id_key.cert) headers["x-push-token"] = push_token diff --git a/olddemo.py b/olddemo.py deleted file mode 100644 index 43888a7..0000000 --- a/olddemo.py +++ /dev/null @@ -1,191 +0,0 @@ -import getpass -import json - -import oldids as ids -from oldids import * -#from base64 import b64encode, b64decode - -# Open config -try: - with open("config.json", "r") as f: - CONFIG = json.load(f) -except FileNotFoundError: - CONFIG = {} - - -def input_multiline(prompt): - print(prompt) - lines = [] - while True: - line = input() - if line == "": - break - lines.append(line) - return "\n".join(lines) - - -def refresh_token(): - # If no username is set, prompt for it - if "username" not in CONFIG: - CONFIG["username"] = input("Enter iCloud username: ") - # If no password is set, prompt for it - if "password" not in CONFIG: - CONFIG["password"] = getpass.getpass("Enter iCloud password: ") - # If grandslam authentication is not set, prompt for it - if "use_gsa" not in CONFIG: - CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y" - - def factor_gen(): - return input("Enter iCloud 2FA code: ") - - CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token( - CONFIG["username"], CONFIG["password"], factor_gen=factor_gen - ) - - -def refresh_cert(): - CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert( - CONFIG["user_id"], CONFIG["token"] - ) - - -def create_connection(): - conn = apns.APNSConnection() - token = conn.connect() - # conn.filter(['com.apple.madrid']) - CONFIG["push"] = { - "token": b64encode(token).decode(), - "cert": conn.cert, - "key": conn.private_key, - } - return conn - - -def restore_connection(): - conn = apns.APNSConnection(CONFIG["push"]["key"], CONFIG["push"]["cert"]) - conn.connect(True, b64decode(CONFIG["push"]["token"])) - # conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi']) - return conn - - -def refresh_ids_cert(): - info = { - "uri": "mailto:" + CONFIG["username"], - "user_id": CONFIG["user_id"], - } - - print( - ids._get_handles( - CONFIG["push"]["token"], - CONFIG["user_id"], - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - ) - ) - - resp = None - try: - if "validation_data" in CONFIG: - resp = ids._register_request( - CONFIG["push"]["token"], - info, - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - CONFIG["validation_data"], - ) - except Exception as e: - print(e) - resp = None - - if resp is None: - print( - "Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy." - ) - validation_data = ( - input_multiline("Enter validation data: ") - .replace("\n", "") - .replace(" ", "") - ) - resp = ids._register_request( - CONFIG["push"]["token"], - info, - ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]), - ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]), - validation_data, - ) - CONFIG["validation_data"] = validation_data - - print(resp) - ids_cert = x509.load_der_x509_certificate(resp["services"][0]["users"][0]["cert"]) - ids_cert = ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip() - - CONFIG["ids_cert"] = ids_cert - - -if not "push" in CONFIG: - print("No existing APNs credentials, creating new ones...") - # print("No push conn") - conn = create_connection() -else: - print("Restoring APNs credentials...") - conn = restore_connection() -print("Connected to APNs!") - -if not "ids_cert" in CONFIG: - print("No existing IDS certificate, creating new one...") - if not "key" in CONFIG: - print("No existing authentication certificate, creating new one...") - if not "token" in CONFIG: - print("No existing authentication token, creating new one...") - refresh_token() - print("Got authentication token!") - refresh_cert() - print("Got authentication certificate!") - refresh_ids_cert() -print("Got IDS certificate!") - -ids_keypair = ids.KeyPair(CONFIG["key"], CONFIG["ids_cert"]) - - -def lookup(topic: str, users: list[str]): - print(f"Looking up users {users} for topic {topic}...") - resp = ids.lookup(conn, CONFIG["username"], ids_keypair, topic, users) - - print(resp) - # r = list(resp['results'].values())[0] - for k, v in resp["results"].items(): - print(f"Result for user {k} topic {topic}:") - i = v["identities"] - print(f"IDENTITIES: {len(i)}") - for iden in i: - print("IDENTITY", end=" ") - print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ") - if "client-data" in iden: - print(f"Client Data: {len(iden['client-data'])}") - - else: - print("No client data") - - -# Hack to make sure that the requests and responses match up -# This filter MUST contain all the topics you are looking up -# conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1', 'com.apple.private.alloy.screensharing']) -# import time -# print("...waiting for queued messages... (this is a hack)") -# time.sleep(5) # Let the server send us any messages it was holding -# conn.sink() # Dump the messages - -lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"]) -lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"]) - -lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"]) -lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"]) - -lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"]) - -lookup("com.apple.private.alloy.screensharing", ["mailto:user_test2@icloud.com"]) - -# time.sleep(4) -# Save config -with open("config.json", "w") as f: - json.dump(CONFIG, f, indent=4) \ No newline at end of file diff --git a/oldids.py b/oldids.py deleted file mode 100644 index 1d806e2..0000000 --- a/oldids.py +++ /dev/null @@ -1,436 +0,0 @@ -import gzip -import plistlib -import random -import uuid -from base64 import b64decode, b64encode -from collections import namedtuple -from datetime import datetime - -import requests -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import padding, rsa -from cryptography.x509.oid import NameOID - -import apns -import bags -import gsa - -USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" -PROTOCOL_VERSION = "1640" - -KeyPair = namedtuple("KeyPair", ["key", "cert"]) - -# global_key, global_cert = load_keys() - - -def _send_request( - conn: apns.APNSConnection, - bag_key: str, - topic: str, - body: bytes, - keypair: KeyPair, - username: str, -) -> bytes: - #print(body) - print(bag_key, topic, body, keypair, username) - body = gzip.compress(body, mtime=0) - - push_token = b64encode(conn.token).decode() - - # Sign the request - signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body) - - headers = { - "x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - .replace("\n", ""), - "x-id-nonce": b64encode(nonce).decode(), - "x-id-sig": signature, - "x-push-token": push_token, - "x-id-self-uri": "mailto:user_test2@icloud.com", - "User-Agent": USER_AGENT, - "x-protocol-version": "1640", - } - - # print(headers) - - msg_id = random.randbytes(16) - - req = { - "cT": "application/x-apple-plist", - "U": msg_id, - "c": 96, - "ua": USER_AGENT, - "u": bags.ids_bag()[bag_key], - "h": headers, - "v": 2, - "b": body, - } - print(req) - - conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) - # resp = conn.wait_for_packet(0x0A) - - def check_response(x): - if x[0] != 0x0A: - return False - resp_body = apns._get_field(x[1], 3) - if resp_body is None: - return False - resp_body = plistlib.loads(resp_body) - return resp_body["U"] == msg_id - - # Lambda to check if the response is the one we want - # conn.incoming_queue.find(check_response) - payload = conn.incoming_queue.wait_pop_find(check_response) - # conn._send_ack(apns._get_field(payload[1], 4)) - resp = apns._get_field(payload[1], 3) - return plistlib.loads(resp) - - -# Performs an IDS lookup -# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic -# self: the user's email address -# keypair: a KeyPair object containing the user's private key and certificate -# topic: the IDS topic to query -# query: a list of URIs to query -def lookup( - conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str] -) -> any: - conn.filter(["com.apple.madrid"]) - query = {"uris": query} - resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self) - # resp = plistlib.loads(resp) - # print(resp) - resp = gzip.decompress(resp["b"]) - resp = plistlib.loads(resp) - return resp - - -def _auth_token_request(username: str, password: str) -> any: - # Turn the PET into an auth token - data = { - "apple-id": username, - "client-id": str(uuid.uuid4()), - "delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, - "password": password, - } - data = plistlib.dumps(data) - - r = requests.post( - "https://setup.icloud.com/setup/prefpane/loginDelegates", - auth=(username, password), - data=data, - verify=False, - ) - r = plistlib.loads(r.content) - return r - - -# Gets an IDS auth token for the given username and password -# Will use native Grand Slam on macOS -# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted -# Returns (realm user id, auth token) -def _get_auth_token( - username: str, password: str, factor_gen: callable = None -) -> tuple[str, str]: - from sys import platform - - # if use_gsa: - if platform == "darwin": - g = gsa.authenticate(username, password, gsa.Anisette()) - pet = g["t"]["com.apple.gs.idms.pet"]["token"] - else: - # Make the request without the 2FA code to make the prompt appear - _auth_token_request(username, password) - # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA - # Now make the request with the 2FA code - if factor_gen is None: - pet = password + input("Enter 2FA code: ") - else: - pet = password + factor_gen() - r = _auth_token_request(username, pet) - # print(r) - if "description" in r: - raise Exception(f"Error: {r['description']}") - service_data = r["delegates"]["com.apple.private.ids"]["service-data"] - realm_user_id = service_data["realm-user-id"] - auth_token = service_data["auth-token"] - # print(f"Auth token for {realm_user_id}: {auth_token}") - return realm_user_id, auth_token - - -def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: - csr = ( - x509.CertificateSigningRequestBuilder() - .subject_name( - x509.Name( - [ - x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), - ] - ) - ) - .sign(private_key, hashes.SHA256()) - ) - - csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") - return ( - csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") - .replace("-----END CERTIFICATE REQUEST-----", "") - .replace("\n", "") - ) - - -# Gets an IDS auth cert for the given user id and auth token -# Returns [private key PEM, certificate PEM] -def _get_auth_cert(user_id, token) -> KeyPair: - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) - body = { - "authentication-data": {"auth-token": token}, - "csr": b64decode(_generate_csr(private_key)), - "realm-user-id": user_id, - } - - body = plistlib.dumps(body) - - r = requests.post( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", - data=body, - headers={"x-protocol-version": "1630"}, - verify=False, - ) - r = plistlib.loads(r.content) - if r["status"] != 0: - raise (Exception(f"Failed to get auth cert: {r}")) - cert = x509.load_der_x509_certificate(r["cert"]) - return KeyPair( - private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - .decode("utf-8") - .strip(), - cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(), - ) - - -def _register_request( - push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data -): - body = { - "hardware-version": "MacBookPro18,3", - "language": "en-US", - "os-version": "macOS,13.2.1,22D68", - "software-version": "22D68", - "services": [ - { - "capabilities": [{"flags": 1, "name": "Messenger", "version": 1}], - "service": "com.apple.madrid", - "users": [ - { - # TODO: Pass ALL URIs from get handles - "uris": [{"uri": info["uri"]}], - "user-id": info["user_id"], - } - ], - } - ], - "validation-data": b64decode(validation_data), - } - - body = plistlib.dumps(body) - - headers = { - "x-protocol-version": PROTOCOL_VERSION, - "x-auth-user-id-0": info["user_id"], - } - _add_auth_push_signatures( - headers, body, "id-register", auth_key, push_key, push_token, 0 - ) - - r = requests.post( - "https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register", - headers=headers, - data=body, - verify=False, - ) - r = plistlib.loads(r.content) - print(f'Response code: {r["status"]}') - if "status" in r and r["status"] == 6004: - raise Exception("Validation data expired!") - # TODO: Do validation of nested statuses - return r - - -def mini_cert(cert: str): - return ( - cert.replace("\n", "") - .replace("-----BEGIN CERTIFICATE-----", "") - .replace("-----END CERTIFICATE-----", "") - ) - - -PROTOCOL_VERSION = "1640" - - -def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): - headers = { - "x-protocol-version": PROTOCOL_VERSION, - "x-auth-user-id": user_id, - } - _add_auth_push_signatures( - headers, None, "id-get-handles", auth_key, push_key, push_token - ) - - r = requests.get( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", - headers=headers, - verify=False, - ) - - r = plistlib.loads(r.content) - - if not "handles" in r: - raise Exception("No handles in response: " + str(r)) - - return [handle["uri"] for handle in r["handles"]] - - -class IDSUser: - def _authenticate_for_token( - self, username: str, password: str, factor_callback: callable = None - ): - self.user_id, self._auth_token = _get_auth_token( - username, password, factor_callback - ) - - def _authenticate_for_cert(self): - self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token) - - # Factor callback will be called if a 2FA code is necessary - def __init__( - self, - push_connection: apns.APNSConnection, - username: str, - password: str, - factor_callback: callable = None, - ): - self.push_connection = push_connection - self._authenticate_for_token(username, password, factor_callback) - self._authenticate_for_cert() - self.handles = _get_handles( - b64encode(self.push_connection.token), - self.user_id, - self._auth_keypair, - KeyPair(self.push_connection.private_key, self.push_connection.cert), - ) - - def __str__(self): - return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" - - -def test(): - import getpass - - conn = apns.APNSConnection() - conn.connect() - username = input("Enter username: ") - password = getpass.getpass("Enter password: ") - user = IDSUser(conn, username, password) - print(user) - - -# SIGNING STUFF - -# Nonce Format: -# 01000001876bd0a2c0e571093967fce3d7 -# 01 # version -# 000001876d008cc5 # unix time -# r1r2r3r4r5r6r7r8 # random bytes -def generate_nonce() -> bytes: - return ( - b"\x01" - + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") - + random.randbytes(8) - ) - - -# Creates a payload from individual parts for signing -def _create_payload( - bag_key: str, - query_string: str, - push_token: str, - payload: bytes, - nonce: bytes = None, -) -> tuple[str, bytes]: - # Generate the nonce - if nonce is None: - nonce = generate_nonce() - print(push_token) - push_token = b64decode(push_token) - - if payload is None: - payload = b"" - - return ( - nonce - + len(bag_key).to_bytes(4, "big") - + bag_key.encode() - + len(query_string).to_bytes(4, "big") - + query_string.encode() - + len(payload).to_bytes(4, "big") - + payload - + len(push_token).to_bytes(4, "big") - + push_token, - nonce, - ) - - -# Returns signature, nonce -def _sign_payload( - private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes -) -> tuple[str, bytes]: - # Load the private key - key = serialization.load_pem_private_key( - private_key.encode(), password=None, backend=default_backend() - ) - - payload, nonce = _create_payload(bag_key, query_string, push_token, payload) - sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) - - sig = b"\x01\x01" + sig - sig = b64encode(sig).decode() - - return sig, nonce - - -# Add headers for x-push-sig and x-auth-sig stuff -def _add_auth_push_signatures( - headers: dict, - body: bytes, - bag_key: str, - auth_key: KeyPair, - push_key: KeyPair, - push_token: str, - auth_number=None, -): - push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) - headers["x-push-sig"] = push_sig - headers["x-push-nonce"] = b64encode(push_nonce) - headers["x-push-cert"] = mini_cert(push_key.cert) - headers["x-push-token"] = push_token - - auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) - auth_postfix = "-" + str(auth_number) if auth_number is not None else "" - headers["x-auth-sig" + auth_postfix] = auth_sig - headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) - headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert) - - -if __name__ == "__main__": - test() From 9cec9d09655c53530abc3e01bfb1e9b2fb519fac Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 19:31:09 -0400 Subject: [PATCH 8/9] slight cleanup --- ids/query.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ids/query.py b/ids/query.py index b6005a0..99a3c71 100644 --- a/ids/query.py +++ b/ids/query.py @@ -6,7 +6,7 @@ from base64 import b64encode import apns import bags -from ._helpers import USER_AGENT, KeyPair +from ._helpers import KeyPair, PROTOCOL_VERSION from . import signing @@ -18,19 +18,19 @@ def lookup( topic, ) -> bytes: BAG_KEY = "id-query" - conn.filter([topic]) - body = {"uris": query} - body = plistlib.dumps(body) + conn.filter([topic]) + + body = plistlib.dumps({"uris": query}) body = gzip.compress(body, mtime=0) push_token = b64encode(conn.token).decode() headers = { "x-id-self-uri": self_uri, - "x-protocol-version": "1630", + "x-protocol-version": PROTOCOL_VERSION, } - signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token, None) + signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token) msg_id = random.randbytes(16) @@ -43,7 +43,6 @@ def lookup( "v": 2, "b": body, } - print(req) conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY)) From ce82d4b8da8d60c05498d38e62ad0b740a365ed1 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 9 May 2023 20:01:22 -0400 Subject: [PATCH 9/9] use bags rather than hardcoded urls --- apns.py | 5 ++++- bags.py | 4 ++-- ids/__init__.py | 6 +++--- ids/profile.py | 19 +++++++++++++------ ids/query.py | 7 ++++++- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/apns.py b/apns.py index 5b1f459..1f0ce8f 100644 --- a/apns.py +++ b/apns.py @@ -9,8 +9,11 @@ from hashlib import sha1 import tlslite import albert +import bags -COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config +#COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config +# Pick a random courier server from 01 to APNSCourierHostcount +COURIER_HOST = f"{random.randint(1, bags.apns_init_bag()['APNSCourierHostcount'])}-{bags.apns_init_bag()['APNSCourierHostname']}" COURIER_PORT = 5223 ALPN = [b"apns-security-v2"] diff --git a/bags.py b/bags.py index ca27903..3eb8524 100644 --- a/bags.py +++ b/bags.py @@ -3,7 +3,7 @@ import plistlib import requests -def apns_init_bag(): +def apns_init_bag_old(): r = requests.get("https://init.push.apple.com/bag", verify=False) if r.status_code != 200: raise Exception("Failed to get APNs init bag") @@ -15,7 +15,7 @@ def apns_init_bag(): # This is the same as the above, but the response has a signature which we unwrap -def apns_init_bag_2(): +def apns_init_bag(): r = requests.get("http://init-p01st.push.apple.com/bag", verify=False) if r.status_code != 200: raise Exception("Failed to get APNs init bag 2") diff --git a/ids/__init__.py b/ids/__init__.py index 12ab2e3..5f72816 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -10,13 +10,13 @@ class IDSUser: def _authenticate_for_token( self, username: str, password: str, factor_callback: callable = None ): - self.user_id, self._auth_token = profile._get_auth_token( + self.user_id, self._auth_token = profile.get_auth_token( username, password, factor_callback ) # Sets self._auth_keypair using self.user_id and self._auth_token def _authenticate_for_cert(self): - self._auth_keypair = profile._get_auth_cert(self.user_id, self._auth_token) + self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token) # Factor callback will be called if a 2FA code is necessary def __init__( @@ -37,7 +37,7 @@ class IDSUser: ): self._authenticate_for_token(username, password, factor_callback) self._authenticate_for_cert() - self.handles = profile._get_handles( + self.handles = profile.get_handles( b64encode(self.push_connection.token), self.user_id, self._auth_keypair, diff --git a/ids/profile.py b/ids/profile.py index 7894cf7..21b63ea 100644 --- a/ids/profile.py +++ b/ids/profile.py @@ -11,6 +11,7 @@ from cryptography.hazmat.primitives.asymmetric import padding, rsa from cryptography.x509.oid import NameOID import gsa +import bags from . import signing from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair @@ -27,6 +28,7 @@ def _auth_token_request(username: str, password: str) -> any: data = plistlib.dumps(data) r = requests.post( + # TODO: Figure out which URL bag we can get this from "https://setup.icloud.com/setup/prefpane/loginDelegates", auth=(username, password), data=data, @@ -40,7 +42,7 @@ def _auth_token_request(username: str, password: str) -> any: # Will use native Grand Slam on macOS # If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted # Returns (realm user id, auth token) -def _get_auth_token( +def get_auth_token( username: str, password: str, factor_gen: callable = None ) -> tuple[str, str]: from sys import platform @@ -92,7 +94,9 @@ def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: # Gets an IDS auth cert for the given user id and auth token # Returns [private key PEM, certificate PEM] -def _get_auth_cert(user_id, token) -> KeyPair: +def get_auth_cert(user_id, token) -> KeyPair: + BAG_KEY = "id-authenticate-ds-id" + private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) @@ -105,7 +109,8 @@ def _get_auth_cert(user_id, token) -> KeyPair: body = plistlib.dumps(body) r = requests.post( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", + bags.ids_bag()[BAG_KEY], + #"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS", data=body, headers={"x-protocol-version": "1630"}, verify=False, @@ -126,17 +131,19 @@ def _get_auth_cert(user_id, token) -> KeyPair: ) -def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): +def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): + BAG_KEY = "id-get-handles" + headers = { "x-protocol-version": PROTOCOL_VERSION, "x-auth-user-id": user_id, } signing.add_auth_signature( - headers, None, "id-get-handles", auth_key, push_key, push_token + headers, None, BAG_KEY, auth_key, push_key, push_token ) r = requests.get( - "https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles", + bags.ids_bag()[BAG_KEY], headers=headers, verify=False, ) diff --git a/ids/query.py b/ids/query.py index 99a3c71..059c97c 100644 --- a/ids/query.py +++ b/ids/query.py @@ -61,4 +61,9 @@ def lookup( resp = plistlib.loads(resp) resp = gzip.decompress(resp["b"]) resp = plistlib.loads(resp) - return resp + + if resp['status'] != 0: + raise Exception(f'Query failed: {resp}') + if not 'results' in resp: + raise Exception(f'No results in response: {resp}') + return resp['results']