from typing import Literal from . import cloudkit_pb2, gsa, _utils import uuid import dataclasses import typing import random import requests import logging logger = logging.getLogger("cloudkit") @dataclasses.dataclass class Record: name: uuid.UUID type: str fields: dict[str, typing.Any] class CloudKit: def __init__(self, dsid: str, cloudkit_token: str, mme_token: str, sandbox: bool = False): """ Represents a CloudKit user. `dsid`: The user's DSID. `cloudkit_token`: `cloudKitToken` from the `com.apple.mobileme` delegate. `mme_token`: `mmeAuthToken` from the `com.apple.mobileme` delegate. `sandbox`: Whether to use the CloudKit sandbox environment. """ self.dsid = dsid self.cloudkit_token = cloudkit_token self.mme_token = mme_token self.sandbox = sandbox def container(self, container: str, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC") -> "CloudKitContainer": """ Convenience method for creating a CloudKitContainer object. """ return CloudKitContainer(container, self, scope) class CloudKitContainer: def __init__(self, container: str, user: CloudKit, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC"): """ Represents a CloudKit container. container: The CloudKit container ID. (e.g. "iCloud.dev.jjtech.experiments.cktest") user: The CloudKit user to use for authentication. scope: The CloudKit database scope to use. """ self.container = container self.user = user self.scope = scope self.user_id = self._fetch_user_id() def _fetch_user_id(self): headers = { "x-cloudkit-containerid": self.container, "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix "x-cloudkit-databasescope": self.scope, "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", "accept": "application/x-protobuf", "x-apple-operation-id": random.randbytes(8).hex(), "x-apple-request-uuid": str(uuid.uuid4()).upper() } headers.update(gsa.generate_anisette_headers()) r = requests.post("https://gateway.icloud.com/setup/setup/ck/v1/ckAppInit", params={"container": self.container}, headers=headers, auth=(self.user.dsid, self.user.mme_token), verify=False) logger.debug("Got app init response: ", r.content) return r.json()["cloudKitUserId"] def save_record(self, record: Record, zone: str = "_defaultZone", owner: str = "_defaultOwner") -> None: """ Saves a record to the container. """ logger.info(f"Saving record {record.name} to {self.container}") headers = { "x-cloudkit-authtoken": self.user.cloudkit_token, "x-cloudkit-userid": self.user_id, "x-cloudkit-containerid": self.container, "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix "x-cloudkit-databasescope": self.scope, "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", "accept": "application/x-protobuf", "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', "x-apple-operation-id": random.randbytes(8).hex(), "x-apple-request-uuid": str(uuid.uuid4()).upper(), "user-agent": "CloudKit/2060.11 (22F82)" } headers.update(gsa.generate_anisette_headers()) body = _build_record_save_request(record, self.container, self.user.sandbox, self.scope, zone, owner) r = requests.post( "https://gateway.icloud.com/ckdatabase/api/client/record/save", headers=headers, data=body, verify=False ) print(r.content) def _build_record_save_request( record: Record, container: str, sandbox: bool = False, database: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC", zone: str = "_defaultZone", owner: str = "_defaultOwner", ): hardware_id = uuid.uuid4() # Generate a new hardware ID for each request? operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request? record_id = uuid.uuid4() # Generate a new record ID for each request? request = cloudkit_pb2.RequestOperation() request.header.applicationContainer = container request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX if sandbox else cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.PRODUCTION request.header.deviceHardwareID = str(hardware_id).upper() if database == "PUBLIC": request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB elif database == "PRIVATE": request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PRIVATE_DB elif database == "SHARED": request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.SHARED_DB request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE request.request.operationUUID = str(operation_uuid).upper() request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE request.request.last = True request.recordSaveRequest.record.recordIdentifier.value.name = str(record_id).upper() request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = zone request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = owner request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER request.recordSaveRequest.record.type.name = record.type for key, value in record.fields.items(): request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field()) request.recordSaveRequest.record.recordField[-1].identifier.name = key request.recordSaveRequest.record.recordField[-1].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE request.recordSaveRequest.record.recordField[-1].value.stringValue = value len_bytes = _utils.ULEB128.encode(len(request.SerializeToString())) return len_bytes + request.SerializeToString()