From 7594eabf7d04e1b3dadf173b46b29d7333037d0c Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Sun, 22 Oct 2023 17:25:32 -0400 Subject: [PATCH] more refactoring, make cloudkit more robust --- examples/cloudkit.py | 39 ++++------------ icloud/_utils.py | 73 +++++++++++++++++++++++++++++ icloud/cloudkit.py | 106 +++++++++++++++++++++++++++++++++++++++++-- icloud/gsa.py | 21 +++++---- 4 files changed, 195 insertions(+), 44 deletions(-) create mode 100644 icloud/_utils.py diff --git a/examples/cloudkit.py b/examples/cloudkit.py index cee9649..39c4036 100644 --- a/examples/cloudkit.py +++ b/examples/cloudkit.py @@ -9,6 +9,7 @@ import json import random import icloud.gsa as gsa import icloud.cloudkit as cloudkit +import icloud from rich.logging import RichHandler import logging @@ -21,12 +22,13 @@ def main(): # See if we have a search party token saved import os if os.path.exists(CONFIG_PATH): - print("Using saved config...") + logging.info("Using saved config...") #print("Found search party token!") with open(CONFIG_PATH, "r") as f: j = json.load(f) cloudkit_token = j["cloudkit_token"] ds_prs_id = j["ds_prs_id"] + mme_token = j["mme_token"] else: # Prompt for username and password @@ -36,45 +38,22 @@ def main(): r = icloud.login(USERNAME, PASSWORD, delegates=["com.apple.mobileme"]) cloudkit_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['cloudKitToken'] + mme_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['mmeAuthToken'] ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response - print("Logged in!") + logging.info("Logged in!") with open(CONFIG_PATH, "w") as f: json.dump({ "cloudkit_token": cloudkit_token, "ds_prs_id": ds_prs_id, + "mme_token": mme_token, }, f, indent=4) - print("CloudKit token: ", cloudkit_token) + logging.debug("CloudKit token: ", cloudkit_token) - headers = { - "x-cloudkit-authtoken": cloudkit_token, - "x-cloudkit-userid": "_ec5fa262446ad56fb4bda84d00e981ff", # Hash of bundle id and icloud id - "x-cloudkit-containerid": "iCloud.dev.jjtech.experiments.cktest", - "x-cloudkit-bundleid": "dev.jjtech.experiments.cktest", - "x-cloudkit-bundleversion": "1", - "x-cloudkit-databasescope": "Public", - "x-cloudkit-environment": "Sandbox", - - "accept": "application/x-protobuf", - "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', - - "x-apple-operation-id": random.randbytes(8).hex(), - "x-apple-request-uuid": str(uuid.uuid4()).upper() - } - - headers.update(gsa.generate_anisette_headers()) - - body = cloudkit.build_record_save_request(cloudkit.Record(uuid.uuid4(), "ToDoItem", {"title": "Test"}), "iCloud.dev.jjtech.experiments.cktest", sandbox=True) - r = requests.post( - "https://gateway.icloud.com/ckdatabase/api/client/record/save", - headers=headers, - data=body, - verify=False - ) - - print(r.content) + ck = cloudkit.CloudKit(ds_prs_id, cloudkit_token, mme_token, sandbox=True) + ck.container("iCloud.dev.jjtech.experiments.cktest").save_record(cloudkit.Record(uuid.uuid4(), "ToDoItem", {"title": "Test"})) if __name__ == "__main__": main() \ No newline at end of file diff --git a/icloud/_utils.py b/icloud/_utils.py new file mode 100644 index 0000000..5b076fe --- /dev/null +++ b/icloud/_utils.py @@ -0,0 +1,73 @@ +# https://en.wikipedia.org/wiki/LEB128 +# +# LEB128 or Little Endian Base 128 is a form of variable-length code +# compression used to store an arbitrarily large integer in a small number of +# bytes. LEB128 is used in the DWARF debug file format and the WebAssembly +# binary encoding for all integer literals. + +# Taken from https://github.com/mohanson/pywasm/blob/master/pywasm/leb128.py under the MIT license + +import typing + + +class ULEB128: + @staticmethod + def encode(i: int) -> bytearray: + assert i >= 0 + r = [] + while True: + byte = i & 0x7f + i = i >> 7 + if i == 0: + r.append(byte) + return bytearray(r) + r.append(0x80 | byte) + + @staticmethod + def decode(b: bytearray) -> int: + r = 0 + for i, e in enumerate(b): + r = r + ((e & 0x7f) << (i * 7)) + return r + + @staticmethod + def decode_reader(r: typing.BinaryIO) -> (int, int): + a = bytearray() + while True: + b = ord(r.read(1)) + a.append(b) + if (b & 0x80) == 0: + break + return ULEB128.decode(a), len(a) + + +class ILEB128: + @staticmethod + def encode(i: int) -> bytearray: + r = [] + while True: + byte = i & 0x7f + i = i >> 7 + if (i == 0 and byte & 0x40 == 0) or (i == -1 and byte & 0x40 != 0): + r.append(byte) + return bytearray(r) + r.append(0x80 | byte) + + @staticmethod + def decode(b: bytearray) -> int: + r = 0 + for i, e in enumerate(b): + r = r + ((e & 0x7f) << (i * 7)) + if e & 0x40 != 0: + r |= - (1 << (i * 7) + 7) + return r + + @staticmethod + def decode_reader(r: typing.BinaryIO) -> (int, int): + a = bytearray() + while True: + b = ord(r.read(1)) + a.append(b) + if (b & 0x80) == 0: + break + return ILEB128.decode(a), len(a) \ No newline at end of file diff --git a/icloud/cloudkit.py b/icloud/cloudkit.py index 06fb77a..c340407 100644 --- a/icloud/cloudkit.py +++ b/icloud/cloudkit.py @@ -1,8 +1,13 @@ from typing import Literal -from . import cloudkit_pb2 +from . import cloudkit_pb2, gsa, _utils import uuid import dataclasses import typing +import random +import requests +import logging + +logger = logging.getLogger("cloudkit") @dataclasses.dataclass class Record: @@ -10,7 +15,98 @@ class Record: type: str fields: dict[str, typing.Any] -def build_record_save_request( +class CloudKit: + def __init__(self, dsid: str, cloudkit_token: str, mme_token: str, sandbox: bool = False): + """ + Represents a CloudKit user. + `dsid`: The user's DSID. + `cloudkit_token`: `cloudKitToken` from the `com.apple.mobileme` delegate. + `mme_token`: `mmeAuthToken` from the `com.apple.mobileme` delegate. + `sandbox`: Whether to use the CloudKit sandbox environment. + """ + self.dsid = dsid + self.cloudkit_token = cloudkit_token + self.mme_token = mme_token + self.sandbox = sandbox + + def container(self, container: str, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC") -> "CloudKitContainer": + """ + Convenience method for creating a CloudKitContainer object. + """ + return CloudKitContainer(container, self, scope) + +class CloudKitContainer: + def __init__(self, container: str, user: CloudKit, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC"): + """ + Represents a CloudKit container. + container: The CloudKit container ID. (e.g. "iCloud.dev.jjtech.experiments.cktest") + user: The CloudKit user to use for authentication. + scope: The CloudKit database scope to use. + """ + self.container = container + self.user = user + self.scope = scope + self.user_id = self._fetch_user_id() + + def _fetch_user_id(self): + headers = { + "x-cloudkit-containerid": self.container, + "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix + "x-cloudkit-databasescope": self.scope, + "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", + + "accept": "application/x-protobuf", + + "x-apple-operation-id": random.randbytes(8).hex(), + "x-apple-request-uuid": str(uuid.uuid4()).upper() + } + + headers.update(gsa.generate_anisette_headers()) + + r = requests.post("https://gateway.icloud.com/setup/setup/ck/v1/ckAppInit", params={"container": self.container}, headers=headers, auth=(self.user.dsid, self.user.mme_token), verify=False) + + logger.debug("Got app init response: ", r.content) + return r.json()["cloudKitUserId"] + + def save_record(self, record: Record, zone: str = "_defaultZone", owner: str = "_defaultOwner") -> None: + """ + Saves a record to the container. + """ + logger.info(f"Saving record {record.name} to {self.container}") + + headers = { + "x-cloudkit-authtoken": self.user.cloudkit_token, + "x-cloudkit-userid": self.user_id, + "x-cloudkit-containerid": self.container, + "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix + "x-cloudkit-databasescope": self.scope, + "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", + + "accept": "application/x-protobuf", + "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', + + "x-apple-operation-id": random.randbytes(8).hex(), + "x-apple-request-uuid": str(uuid.uuid4()).upper(), + + "user-agent": "CloudKit/2060.11 (22F82)" + } + + headers.update(gsa.generate_anisette_headers()) + + body = _build_record_save_request(record, self.container, self.user.sandbox, self.scope, zone, owner) + r = requests.post( + "https://gateway.icloud.com/ckdatabase/api/client/record/save", + headers=headers, + data=body, + verify=False + ) + + print(r.content) + + + + +def _build_record_save_request( record: Record, container: str, sandbox: bool = False, @@ -18,8 +114,6 @@ def build_record_save_request( zone: str = "_defaultZone", owner: str = "_defaultOwner", ): - MAGIC_BYTES = b"\xfe\x03" - hardware_id = uuid.uuid4() # Generate a new hardware ID for each request? operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request? record_id = uuid.uuid4() # Generate a new record ID for each request? @@ -62,4 +156,6 @@ def build_record_save_request( request.recordSaveRequest.record.recordField[-1].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE request.recordSaveRequest.record.recordField[-1].value.stringValue = value - return MAGIC_BYTES + request.SerializeToString() \ No newline at end of file + len_bytes = _utils.ULEB128.encode(len(request.SerializeToString())) + + return len_bytes + request.SerializeToString() \ No newline at end of file diff --git a/icloud/gsa.py b/icloud/gsa.py index 7697689..240043c 100644 --- a/icloud/gsa.py +++ b/icloud/gsa.py @@ -17,6 +17,9 @@ import srp._pysrp as srp from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +import logging +logger = logging.getLogger("gsa") + # Server to use for anisette generation ANISETTE = False # Use local generation with AOSKit (macOS only) @@ -126,7 +129,7 @@ def _generate_meta_headers(serial: str = "0", user_id: uuid = uuid.uuid4(), devi } def _generate_local_anisette() -> dict: - print("Using local anisette generation") + logger.debug("Using local anisette generation") """Generates anisette data using AOSKit locally""" import objc @@ -146,7 +149,7 @@ def _generate_local_anisette() -> dict: } def _generate_remote_anisette(url: str) -> dict: - print("Using remote anisette generation: " + url) + logger.debug("Using remote anisette generation: " + url) h = json.loads(requests.get(url, timeout=5).text) return { "X-Apple-I-MD": h["X-Apple-I-MD"], @@ -276,7 +279,7 @@ def trusted_second_factor(dsid, idms_token): if check_error(r): return - print("2FA successful") + logger.info("2FA successful") def sms_second_factor(dsid, idms_token): @@ -356,7 +359,7 @@ def authenticate(username, password): return if r["sp"] != "s2k": - print(f"This implementation only supports s2k. Server returned {r['sp']}") + logger.error(f"This implementation only supports s2k. Server returned {r['sp']}") return # Change the password out from under the SRP library, as we couldn't calculate it without the salt. @@ -366,7 +369,7 @@ def authenticate(username, password): # Make sure we processed the challenge correctly if M is None: - print("Failed to process challenge") + logger.critical("Failed to process challenge") return r = authenticated_request( @@ -384,7 +387,7 @@ def authenticate(username, password): # Make sure that the server's session key matches our session key (and thus that they are not an imposter) usr.verify_session(r["M2"]) if not usr.authenticated(): - print("Failed to verify session") + logger.critical("Failed to verify session") return spd = decrypt_cbc(usr, r["spd"]) @@ -396,7 +399,7 @@ def authenticate(username, password): spd = plist.loads(PLISTHEADER + spd) if "au" in r["Status"] and r["Status"]["au"] == "trustedDeviceSecondaryAuth": - print("Trusted device authentication required") + logger.info("Trusted device authentication required") # Replace bytes with strings for k, v in spd.items(): if isinstance(v, bytes): @@ -404,10 +407,10 @@ def authenticate(username, password): trusted_second_factor(spd["adsid"], spd["GsIdmsToken"]) return authenticate(username, password) elif "au" in r["Status"] and r["Status"]["au"] == "secondaryAuth": - print("SMS authentication required") + logger.info("SMS authentication required") sms_second_factor(spd["adsid"], spd["GsIdmsToken"]) elif "au" in r["Status"]: - print(f"Unknown auth value {r['Status']['au']}") + logger.info(f"Unknown auth value {r['Status']['au']}") return else: # print("Assuming 2FA is not required")