pypush-plus-plus/oldids.py

435 lines
13 KiB
Python
Raw Normal View History

2023-05-02 19:53:18 -05:00
import gzip
import plistlib
import random
import uuid
2023-04-11 11:17:53 -05:00
from base64 import b64decode, b64encode
2023-05-02 19:53:18 -05:00
from collections import namedtuple
2023-04-11 11:17:53 -05:00
from datetime import datetime
import requests
2023-05-02 07:39:11 -05:00
from cryptography import x509
2023-04-11 11:17:53 -05:00
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
2023-05-02 07:39:11 -05:00
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
2023-04-11 11:17:53 -05:00
import apns
2023-04-12 12:13:47 -05:00
import bags
import gsa
2023-05-02 13:10:13 -05:00
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
2023-05-09 12:34:26 -05:00
PROTOCOL_VERSION = "1640"
KeyPair = namedtuple("KeyPair", ["key", "cert"])
# global_key, global_cert = load_keys()
2023-05-02 19:53:18 -05:00
def _send_request(
conn: apns.APNSConnection,
bag_key: str,
topic: str,
body: bytes,
keypair: KeyPair,
username: str,
) -> bytes:
2023-05-02 16:48:47 -05:00
body = gzip.compress(body, mtime=0)
2023-04-11 11:17:53 -05:00
2023-05-02 13:10:13 -05:00
push_token = b64encode(conn.token).decode()
2023-04-11 11:17:53 -05:00
# Sign the request
2023-05-09 12:34:26 -05:00
signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
2023-04-11 11:17:53 -05:00
headers = {
2023-05-02 19:53:18 -05:00
"x-id-self-uri": "mailto:" + username,
2023-04-11 11:17:53 -05:00
"User-Agent": USER_AGENT,
"x-protocol-version": "1630",
}
2023-05-09 14:36:33 -05:00
_add_id_sig(headers, body, bag_key, keypair, push_token)
2023-05-02 19:53:18 -05:00
# print(headers)
2023-05-02 13:10:13 -05:00
2023-05-02 19:51:02 -05:00
msg_id = random.randbytes(16)
2023-04-11 11:17:53 -05:00
req = {
"cT": "application/x-apple-plist",
2023-05-02 19:51:02 -05:00
"U": msg_id,
2023-04-11 11:17:53 -05:00
"c": 96,
"ua": USER_AGENT,
2023-04-12 12:13:47 -05:00
"u": bags.ids_bag()[bag_key],
2023-04-11 11:17:53 -05:00
"h": headers,
"v": 2,
"b": body,
}
conn.send_message(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
2023-05-02 19:53:18 -05:00
# resp = conn.wait_for_packet(0x0A)
2023-05-02 19:51:02 -05:00
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
2023-05-02 19:53:18 -05:00
return resp_body["U"] == msg_id
2023-05-02 19:51:02 -05:00
# Lambda to check if the response is the one we want
2023-05-02 19:53:18 -05:00
# conn.incoming_queue.find(check_response)
2023-05-02 19:51:02 -05:00
payload = conn.incoming_queue.wait_pop_find(check_response)
2023-05-02 19:53:18 -05:00
# conn._send_ack(apns._get_field(payload[1], 4))
2023-05-02 19:51:02 -05:00
resp = apns._get_field(payload[1], 3)
return plistlib.loads(resp)
# Performs an IDS lookup
# conn: an active APNs connection. must be connected and have a push token. will be filtered to the IDS topic
# self: the user's email address
# keypair: a KeyPair object containing the user's private key and certificate
# topic: the IDS topic to query
# query: a list of URIs to query
2023-05-02 19:53:18 -05:00
def lookup(
conn: apns.APNSConnection, self: str, keypair: KeyPair, topic: str, query: list[str]
) -> any:
conn.filter([topic])
2023-04-11 11:17:53 -05:00
query = {"uris": query}
resp = _send_request(conn, "id-query", topic, plistlib.dumps(query), keypair, self)
2023-05-02 19:53:18 -05:00
# resp = plistlib.loads(resp)
# print(resp)
2023-05-02 16:48:47 -05:00
resp = gzip.decompress(resp["b"])
2023-04-11 11:17:53 -05:00
resp = plistlib.loads(resp)
return resp
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
data = {
"apple-id": username,
"client-id": str(uuid.uuid4()),
"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"password": password,
}
data = plistlib.dumps(data)
r = requests.post(
"https://setup.icloud.com/setup/prefpane/loginDelegates",
auth=(username, password),
data=data,
verify=False,
)
r = plistlib.loads(r.content)
return r
2023-04-23 12:42:43 -05:00
# Gets an IDS auth token for the given username and password
2023-05-09 11:21:56 -05:00
# Will use native Grand Slam on macOS
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
# Returns (realm user id, auth token)
2023-04-23 12:42:43 -05:00
def _get_auth_token(
2023-05-09 11:21:56 -05:00
username: str, password: str, factor_gen: callable = None
2023-04-23 12:42:43 -05:00
) -> tuple[str, str]:
2023-05-09 11:21:56 -05:00
from sys import platform
2023-05-09 11:47:14 -05:00
# if use_gsa:
2023-05-09 11:21:56 -05:00
if platform == "darwin":
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
2023-05-09 11:21:56 -05:00
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
# Now make the request with the 2FA code
if factor_gen is None:
pet = password + input("Enter 2FA code: ")
else:
pet = password + factor_gen()
r = _auth_token_request(username, pet)
2023-04-23 12:42:43 -05:00
# print(r)
if "description" in r:
raise Exception(f"Error: {r['description']}")
service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
return realm_user_id, auth_token
2023-04-23 12:42:43 -05:00
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(
x509.Name(
[
2023-04-23 12:42:43 -05:00
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
]
)
)
.sign(private_key, hashes.SHA256())
)
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
return (
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
.replace("-----END CERTIFICATE REQUEST-----", "")
.replace("\n", "")
)
# Gets an IDS auth cert for the given user id and auth token
# Returns [private key PEM, certificate PEM]
2023-05-09 11:21:56 -05:00
def _get_auth_cert(user_id, token) -> KeyPair:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
body = {
"authentication-data": {"auth-token": token},
"csr": b64decode(_generate_csr(private_key)),
"realm-user-id": user_id,
}
body = plistlib.dumps(body)
r = requests.post(
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
data=body,
headers={"x-protocol-version": "1630"},
verify=False,
)
r = plistlib.loads(r.content)
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
2023-05-09 11:21:56 -05:00
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip(),
cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip(),
)
2023-04-24 10:35:48 -05:00
2023-05-02 07:39:11 -05:00
def _register_request(
2023-05-09 12:34:26 -05:00
push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data
2023-05-02 07:39:11 -05:00
):
2023-04-24 10:35:48 -05:00
body = {
"hardware-version": "MacBookPro18,3",
"language": "en-US",
"os-version": "macOS,13.2.1,22D68",
2023-05-02 07:39:11 -05:00
"software-version": "22D68",
2023-04-24 10:35:48 -05:00
"services": [
{
"capabilities": [{"flags": 1, "name": "Messenger", "version": 1}],
"service": "com.apple.madrid",
"users": [
2023-05-02 07:39:11 -05:00
{
2023-05-09 11:21:56 -05:00
# TODO: Pass ALL URIs from get handles
2023-05-02 07:39:11 -05:00
"uris": [{"uri": info["uri"]}],
"user-id": info["user_id"],
}
2023-04-24 10:35:48 -05:00
],
}
],
2023-05-02 07:39:11 -05:00
"validation-data": b64decode(validation_data),
2023-04-24 10:35:48 -05:00
}
2023-04-23 20:04:25 -05:00
body = plistlib.dumps(body)
headers = {
2023-05-09 12:34:26 -05:00
"x-protocol-version": PROTOCOL_VERSION,
2023-05-02 07:39:11 -05:00
"x-auth-user-id-0": info["user_id"],
2023-04-23 20:04:25 -05:00
}
2023-05-09 14:36:33 -05:00
_add_auth_push_sig(
2023-05-09 12:34:26 -05:00
headers, body, "id-register", auth_key, push_key, push_token, 0
)
2023-04-23 20:04:25 -05:00
2023-04-24 10:35:48 -05:00
r = requests.post(
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
headers=headers,
data=body,
verify=False,
)
2023-05-02 07:39:11 -05:00
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
2023-05-09 11:21:56 -05:00
# TODO: Do validation of nested statuses
2023-05-02 07:39:11 -05:00
return r
2023-05-09 11:21:56 -05:00
2023-05-09 11:47:14 -05:00
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
2023-05-09 11:21:56 -05:00
headers = {
2023-05-09 11:47:14 -05:00
"x-protocol-version": PROTOCOL_VERSION,
2023-05-09 11:21:56 -05:00
"x-auth-user-id": user_id,
}
2023-05-09 14:36:33 -05:00
_add_auth_push_sig(
2023-05-09 12:34:26 -05:00
headers, None, "id-get-handles", auth_key, push_key, push_token
)
2023-05-09 11:21:56 -05:00
2023-05-09 11:47:14 -05:00
r = requests.get(
2023-05-09 11:21:56 -05:00
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
headers=headers,
verify=False,
)
r = plistlib.loads(r.content)
2023-05-09 11:47:14 -05:00
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
2023-05-09 11:21:56 -05:00
2023-05-09 11:47:14 -05:00
return [handle["uri"] for handle in r["handles"]]
2023-05-09 11:21:56 -05:00
class IDSUser:
2023-05-09 11:47:14 -05:00
def _authenticate_for_token(
self, username: str, password: str, factor_callback: callable = None
):
self.user_id, self._auth_token = _get_auth_token(
username, password, factor_callback
)
2023-05-09 11:21:56 -05:00
def _authenticate_for_cert(self):
self._auth_keypair = _get_auth_cert(self.user_id, self._auth_token)
2023-05-09 11:47:14 -05:00
2023-05-09 11:21:56 -05:00
# Factor callback will be called if a 2FA code is necessary
2023-05-09 11:47:14 -05:00
def __init__(
self,
push_connection: apns.APNSConnection,
username: str,
password: str,
factor_callback: callable = None,
):
2023-05-09 11:21:56 -05:00
self.push_connection = push_connection
self._authenticate_for_token(username, password, factor_callback)
self._authenticate_for_cert()
2023-05-09 11:47:14 -05:00
self.handles = _get_handles(
b64encode(self.push_connection.token),
self.user_id,
self._auth_keypair,
KeyPair(self.push_connection.private_key, self.push_connection.cert),
)
2023-05-09 11:55:44 -05:00
def __str__(self):
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
2023-05-09 11:47:14 -05:00
2023-05-09 11:21:56 -05:00
def test():
2023-05-09 11:55:44 -05:00
import getpass
2023-05-09 12:34:26 -05:00
2023-05-09 11:21:56 -05:00
conn = apns.APNSConnection()
conn.connect()
2023-05-09 11:55:44 -05:00
username = input("Enter username: ")
password = getpass.getpass("Enter password: ")
user = IDSUser(conn, username, password)
print(user)
2023-05-09 12:34:26 -05:00
# SIGNING STUFF
# Nonce Format:
# 01000001876bd0a2c0e571093967fce3d7
# 01 # version
# 000001876d008cc5 # unix time
# r1r2r3r4r5r6r7r8 # random bytes
def generate_nonce() -> bytes:
return (
b"\x01"
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
+ random.randbytes(8)
)
# Creates a payload from individual parts for signing
def _create_payload(
bag_key: str,
query_string: str,
push_token: str,
payload: bytes,
nonce: bytes = None,
) -> tuple[str, bytes]:
# Generate the nonce
if nonce is None:
nonce = generate_nonce()
push_token = b64decode(push_token)
if payload is None:
payload = b""
return (
nonce
+ len(bag_key).to_bytes(4, "big")
+ bag_key.encode()
+ len(query_string).to_bytes(4, "big")
+ query_string.encode()
+ len(payload).to_bytes(4, "big")
+ payload
+ len(push_token).to_bytes(4, "big")
+ push_token,
nonce,
)
# Returns signature, nonce
def _sign_payload(
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
) -> tuple[str, bytes]:
# Load the private key
key = serialization.load_pem_private_key(
private_key.encode(), password=None, backend=default_backend()
)
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
sig = b"\x01\x01" + sig
sig = b64encode(sig).decode()
return sig, nonce
2023-05-09 14:36:33 -05:00
def dearmour(cert: str):
return (
cert.replace("\n", "")
.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
)
2023-05-09 12:34:26 -05:00
# Add headers for x-push-sig and x-auth-sig stuff
2023-05-09 14:36:33 -05:00
def _add_auth_push_sig(
2023-05-09 12:34:26 -05:00
headers: dict,
body: bytes,
bag_key: str,
auth_key: KeyPair,
push_key: KeyPair,
push_token: str,
auth_number=None,
):
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
headers["x-push-sig"] = push_sig
headers["x-push-nonce"] = b64encode(push_nonce)
2023-05-09 14:36:33 -05:00
headers["x-push-cert"] = dearmour(push_key.cert)
2023-05-09 12:34:26 -05:00
headers["x-push-token"] = push_token
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
headers["x-auth-sig" + auth_postfix] = auth_sig
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
2023-05-09 14:36:33 -05:00
headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert)
def _add_id_sig(
headers: dict,
body: bytes,
bag_key: str,
id_key: KeyPair,
push_token: str,
):
id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body)
headers["x-id-sig"] = id_sig
headers["x-id-nonce"] = b64encode(id_nonce)
headers["x-id-cert"] = dearmour(id_key.cert)
headers["x-push-token"] = push_token
2023-05-09 11:47:14 -05:00
2023-05-09 11:21:56 -05:00
2023-05-09 11:47:14 -05:00
if __name__ == "__main__":
2023-05-09 11:21:56 -05:00
test()