diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..7e561c4 --- /dev/null +++ b/__init__.py @@ -0,0 +1,86 @@ +from base64 import b64encode + +import apns + +from . import _helpers, identity, profile, query + + +class IDSUser: + # Sets self.user_id and self._auth_token + def _authenticate_for_token( + self, username: str, password: str, factor_callback: callable = None + ): + self.user_id, self._auth_token = profile.get_auth_token( + username, password, factor_callback + ) + + # Sets self._auth_keypair using self.user_id and self._auth_token + def _authenticate_for_cert(self): + self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token) + + # Factor callback will be called if a 2FA code is necessary + def __init__( + self, + push_connection: apns.APNSConnection, + ): + self.push_connection = push_connection + self._push_keypair = _helpers.KeyPair( + self.push_connection.private_key, self.push_connection.cert + ) + + self.ec_key = self.rsa_key = None + + def __str__(self): + return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})" + + # Authenticates with a username and password, to create a brand new authentication keypair + def authenticate( + self, username: str, password: str, factor_callback: callable = None + ): + self._authenticate_for_token(username, password, factor_callback) + self._authenticate_for_cert() + self.handles = profile.get_handles( + b64encode(self.push_connection.token), + self.user_id, + self._auth_keypair, + self._push_keypair, + ) + self.current_handle = self.handles[0] + + + # Uses an existing authentication keypair + def restore_authentication( + self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict + ): + self._auth_keypair = auth_keypair + self.user_id = user_id + self.handles = handles + self.current_handle = self.handles[0] + + # This is a separate call so that the user can make sure the first part succeeds before asking for validation data + def register(self, validation_data: str): + """ + self.ec_key, self.rsa_key will be set to a randomly gnenerated EC and RSA keypair + if they are not already set + """ + if self.encryption_identity is None: + self.encryption_identity = identity.IDSIdentity() + + + cert = identity.register( + b64encode(self.push_connection.token), + self.handles, + self.user_id, + self._auth_keypair, + self._push_keypair, + self.encryption_identity, + validation_data, + ) + self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert) + + def restore_identity(self, id_keypair: _helpers.KeyPair): + self._id_keypair = id_keypair + + def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any: + return query.lookup(self.push_connection, self.current_handle, self._id_keypair, uris, topic) + diff --git a/_helpers.py b/_helpers.py new file mode 100644 index 0000000..184723d --- /dev/null +++ b/_helpers.py @@ -0,0 +1,40 @@ +from collections import namedtuple + +USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" +PROTOCOL_VERSION = "1640" + +# KeyPair is a named tuple that holds a private key and a certificate (public key) in PEM form, as well as a x509 +KeyPair = namedtuple("KeyPair", ["key", "cert"]) +Helperx509 = "" + + +def dearmour(armoured: str) -> str: + import re + + # Use a regex to remove the header and footer (generic so it work on more than just certificates) + return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace( + "\n", "" + ) + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa +def parse_key(key: str): + # Check if it is a public or private key + if "PUBLIC" in key: + return serialization.load_pem_public_key(key.encode()) + else: + return serialization.load_pem_private_key(key.encode(), None) + +def serialize_key(key): + if isinstance(key, ec.EllipticCurvePrivateKey) or isinstance(key, rsa.RSAPrivateKey): + return key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).decode("utf-8").strip() + else: + return key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8").strip() + \ No newline at end of file diff --git a/profile.py b/profile.py new file mode 100644 index 0000000..ade4e2d --- /dev/null +++ b/profile.py @@ -0,0 +1,165 @@ +import plistlib +import random +import uuid +from base64 import b64decode + +import requests +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID +from sys import platform + +import bags + +from . import signing +from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair + +import logging +logger = logging.getLogger("ids") + + +auth_token = "" # function should return a rsa private key +realm_user_id = "" # function should return a x509 public key + +def _auth_token_request(username: str, password: str) -> any: + # Turn the PET into an auth token + data = { + "username": username, + #"client-id": str(uuid.uuid4()), + #"delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, + "password": password, + } + data = plistlib.dumps(data) + + r = requests.post( + # TODO: Figure out which URL bag we can get this from + #"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateUser", + "https://setup.icloud.com/setup/prefpane/loginDelegates", + #auth=(username, password), + data=data, + verify=False, + ) + r = plistlib.loads(r.content) + return r + + +# Gets an IDS auth token for the given username and password +# Will use native Grand Slam on macOS +# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted +# Returns (realm user id, auth token) +def get_auth_token( + username: str, password: str, factor_gen: callable = None +) -> tuple[str, str]: + result = _auth_token_request(username, password) + + auth_token = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) # rsa private key + realm_user_id = _generate_csr(auth_token) # x509 public key + # else: + # logger.debug("Using old-style authentication") + # # Make the request without the 2FA code to make the prompt appear + # _auth_token_request(username, password) + # # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA + # # Now make the request with the 2FA code + # if factor_gen is None: + # pet = password + input("Enter 2FA code: ") + # else: + # pet = password + factor_gen() + # r = _auth_token_request(username, pet) + # # print(r) + # if "description" in r: + # raise Exception(f"Error: {r['description']}") + # service_data = r["delegates"]["com.apple.private.ids"]["service-data"] + # realm_user_id = service_data["realm-user-id"] + # auth_token = service_data["auth-token"] + # print(f"Auth token for {realm_user_id}: {auth_token}") + logger.debug(f"Got auth token for IDS: {auth_token}") + return realm_user_id, auth_token + + +def _generate_csr(private_key: rsa.RSAPrivateKey) -> str: + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()), + ] + ) + ) + .sign(private_key, hashes.SHA256()) + ) + + csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + return ( + csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "") + .replace("-----END CERTIFICATE REQUEST-----", "") + .replace("\n", "") + ) + + +# Gets an IDS auth cert for the given user id and auth token +# Returns [private key PEM, certificate PEM] +def get_auth_cert(user_id, token) -> KeyPair: + BAG_KEY = "id-authenticate-ds-id" + + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + body = { + "authentication-data": {"auth-token": token}, + "csr": realm_user_id, + "realm-user-id": user_id, + } + + #body = plistlib.dumps(body) + + priv = auth_token + pub = realm_user_id + x509cert = realm_user_id + + r = {"cert": x509cert} + + cert = r["cert"] + logger.debug("Got auth cert from token") + return KeyPair( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + .decode("utf-8") + .strip(), + cert.strip(), + ) + + +def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair): + BAG_KEY = "id-get-handles" + + headers = { + "x-protocol-version": PROTOCOL_VERSION, + "x-auth-user-id": user_id, + } + signing.add_auth_signature( + headers, None, BAG_KEY, auth_key, push_key, push_token + ) + + #r = requests.get( + #bags.ids_bag()[BAG_KEY], + #headers=headers, + #verify=False, + #) + + #r = plistlib.loads(r[cert]) + + #if not "handles" in r: + #raise Exception("No handles in response: " + str(r)) + + #logger.debug(f"User {user_id} has handles {r['handles']}") + #return [handle["uri"] for handle in r["handles"]] + return [realm_user_id] diff --git a/signing.py b/signing.py new file mode 100644 index 0000000..eb28b47 --- /dev/null +++ b/signing.py @@ -0,0 +1,124 @@ +import random +from base64 import b64decode, b64encode +from datetime import datetime + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.x509.oid import NameOID + +from ._helpers import KeyPair, dearmour, Helperx509 + + +# TODO: Move this helper somewhere else +def armour_cert(cert: bytes) -> str: + return Helperx509 + + +""" +Generates a nonce in this format: +01000001876bd0a2c0e571093967fce3d7 +01 # version + 000001876d008cc5 # unix time + r1r2r3r4r5r6r7r8 # random bytes +""" + + +def generate_nonce() -> bytes: + return ( + b"\x01" + + int(datetime.now().timestamp() * 1000).to_bytes(8, "big") + + random.randbytes(8) + ) + + +import typing + + +# Creates a payload from individual parts for signing +def _create_payload( + bag_key: str, + query_string: str, + push_token: typing.Union[str, bytes], + payload: bytes, + nonce: typing.Union[bytes, None] = None, +) -> tuple[bytes, bytes]: + # Generate the nonce + if nonce is None: + nonce = generate_nonce() + + push_token = b64decode(push_token) + + if payload is None: + payload = b"" + + return ( + nonce + + len(bag_key).to_bytes(4, "big") + + bag_key.encode() + + len(query_string).to_bytes(4, "big") + + query_string.encode() + + len(payload).to_bytes(4, "big") + + payload + + len(push_token).to_bytes(4, "big") + + push_token, + nonce, + ) + + +# Returns signature, nonce +def _sign_payload( + private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes, nonce = None +) -> tuple[str, bytes]: + # Load the private key + key = serialization.load_pem_private_key( + private_key.encode(), password=None, backend=default_backend() + ) + + payload, nonce = _create_payload(bag_key, query_string, push_token, payload, nonce) + + sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore + + sig = b"\x01\x01" + sig + sig = b64encode(sig).decode() + + return sig, nonce + + +# Add headers for x-push-sig and x-auth-sig stuff +def add_auth_signature( + headers: dict, + body: bytes, + bag_key: str, + auth_key: KeyPair, + push_key: KeyPair, + push_token: str, + auth_number=None, +): + push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body) + headers["x-push-sig"] = push_sig + headers["x-push-nonce"] = b64encode(push_nonce) + headers["x-push-cert"] = dearmour(push_key.cert) + headers["x-push-token"] = push_token + + auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body) + auth_postfix = "-" + str(auth_number) if auth_number is not None else "" + headers["x-auth-sig" + auth_postfix] = auth_sig + headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce) + headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert) + + +def add_id_signature( + headers: dict, + body: bytes, + bag_key: str, + id_key: KeyPair, + push_token: str, + nonce=None, +): + id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body, nonce) + headers["x-id-sig"] = id_sig + headers["x-id-nonce"] = b64encode(id_nonce).decode() + headers["x-id-cert"] = dearmour(id_key.cert) + headers["x-push-token"] = push_token