diff --git a/icloud/__init__.py b/icloud/__init__.py index 322be59..437d616 100644 --- a/icloud/__init__.py +++ b/icloud/__init__.py @@ -1,17 +1,26 @@ -import uuid -import plistlib -from . import gsa -import logging -import requests import base64 +import logging +import plistlib +import uuid + +import requests from emulated import nac +from . import gsa + logger = logging.getLogger("icloud") USER_AGENT = "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0" -def login(username: str, password: str, delegates: set[str] = ["com.apple.private.ids"], grandslam: bool = True, anisette: str | bool = False): + +def login( + username: str, + password: str, + delegates: set[str] = ["com.apple.private.ids"], + grandslam: bool = True, + anisette: str | bool = False, +): """ Logs into Apple services listed in `delegates` and returns a dictionary of responses. Commonly used delegates are: @@ -50,11 +59,12 @@ def login(username: str, password: str, delegates: set[str] = ["com.apple.privat "X-Apple-ADSID": adsid, "X-Mme-Nas-Qualify": base64.b64encode(v).decode(), "User-Agent": USER_AGENT, - "X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts + "X-Mme-Client-Info": gsa.build_client( + emulated_app="accountsd" + ), # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts } headers.update(gsa.generate_anisette_headers()) - logger.debug("Making login request") r = requests.post( "https://setup.icloud.com/setup/prefpane/login", @@ -63,6 +73,6 @@ def login(username: str, password: str, delegates: set[str] = ["com.apple.privat headers=headers, verify=False, ) - + # TODO: Error checking and parsing of this response - return plistlib.loads(r.content) \ No newline at end of file + return plistlib.loads(r.content) diff --git a/icloud/_utils.py b/icloud/_utils.py index 5b076fe..8c97239 100644 --- a/icloud/_utils.py +++ b/icloud/_utils.py @@ -16,7 +16,7 @@ class ULEB128: assert i >= 0 r = [] while True: - byte = i & 0x7f + byte = i & 0x7F i = i >> 7 if i == 0: r.append(byte) @@ -27,7 +27,7 @@ class ULEB128: def decode(b: bytearray) -> int: r = 0 for i, e in enumerate(b): - r = r + ((e & 0x7f) << (i * 7)) + r = r + ((e & 0x7F) << (i * 7)) return r @staticmethod @@ -46,7 +46,7 @@ class ILEB128: def encode(i: int) -> bytearray: r = [] while True: - byte = i & 0x7f + byte = i & 0x7F i = i >> 7 if (i == 0 and byte & 0x40 == 0) or (i == -1 and byte & 0x40 != 0): r.append(byte) @@ -57,9 +57,9 @@ class ILEB128: def decode(b: bytearray) -> int: r = 0 for i, e in enumerate(b): - r = r + ((e & 0x7f) << (i * 7)) + r = r + ((e & 0x7F) << (i * 7)) if e & 0x40 != 0: - r |= - (1 << (i * 7) + 7) + r |= -(1 << (i * 7) + 7) return r @staticmethod @@ -70,4 +70,4 @@ class ILEB128: a.append(b) if (b & 0x80) == 0: break - return ILEB128.decode(a), len(a) \ No newline at end of file + return ILEB128.decode(a), len(a) diff --git a/icloud/cloudkit.py b/icloud/cloudkit.py index c340407..c2603ca 100644 --- a/icloud/cloudkit.py +++ b/icloud/cloudkit.py @@ -1,22 +1,28 @@ -from typing import Literal -from . import cloudkit_pb2, gsa, _utils -import uuid import dataclasses -import typing -import random -import requests import logging +import random +import typing +import uuid +from typing import Literal + +import requests + +from . import _utils, cloudkit_pb2, gsa logger = logging.getLogger("cloudkit") + @dataclasses.dataclass class Record: name: uuid.UUID type: str fields: dict[str, typing.Any] + class CloudKit: - def __init__(self, dsid: str, cloudkit_token: str, mme_token: str, sandbox: bool = False): + def __init__( + self, dsid: str, cloudkit_token: str, mme_token: str, sandbox: bool = False + ): """ Represents a CloudKit user. `dsid`: The user's DSID. @@ -28,15 +34,25 @@ class CloudKit: self.cloudkit_token = cloudkit_token self.mme_token = mme_token self.sandbox = sandbox - - def container(self, container: str, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC") -> "CloudKitContainer": + + def container( + self, + container: str, + scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC", + ) -> "CloudKitContainer": """ Convenience method for creating a CloudKitContainer object. """ return CloudKitContainer(container, self, scope) - + + class CloudKitContainer: - def __init__(self, container: str, user: CloudKit, scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC"): + def __init__( + self, + container: str, + user: CloudKit, + scope: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC", + ): """ Represents a CloudKit container. container: The CloudKit container ID. (e.g. "iCloud.dev.jjtech.experiments.cktest") @@ -51,24 +67,32 @@ class CloudKitContainer: def _fetch_user_id(self): headers = { "x-cloudkit-containerid": self.container, - "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix + "x-cloudkit-bundleid": ".".join( + self.container.split(".")[1:] + ), # Remove the "iCloud." prefix "x-cloudkit-databasescope": self.scope, "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", - "accept": "application/x-protobuf", - "x-apple-operation-id": random.randbytes(8).hex(), - "x-apple-request-uuid": str(uuid.uuid4()).upper() + "x-apple-request-uuid": str(uuid.uuid4()).upper(), } headers.update(gsa.generate_anisette_headers()) - r = requests.post("https://gateway.icloud.com/setup/setup/ck/v1/ckAppInit", params={"container": self.container}, headers=headers, auth=(self.user.dsid, self.user.mme_token), verify=False) + r = requests.post( + "https://gateway.icloud.com/setup/setup/ck/v1/ckAppInit", + params={"container": self.container}, + headers=headers, + auth=(self.user.dsid, self.user.mme_token), + verify=False, + ) logger.debug("Got app init response: ", r.content) return r.json()["cloudKitUserId"] - def save_record(self, record: Record, zone: str = "_defaultZone", owner: str = "_defaultOwner") -> None: + def save_record( + self, record: Record, zone: str = "_defaultZone", owner: str = "_defaultOwner" + ) -> None: """ Saves a record to the container. """ @@ -78,33 +102,32 @@ class CloudKitContainer: "x-cloudkit-authtoken": self.user.cloudkit_token, "x-cloudkit-userid": self.user_id, "x-cloudkit-containerid": self.container, - "x-cloudkit-bundleid": ".".join(self.container.split(".")[1:]), # Remove the "iCloud." prefix + "x-cloudkit-bundleid": ".".join( + self.container.split(".")[1:] + ), # Remove the "iCloud." prefix "x-cloudkit-databasescope": self.scope, "x-cloudkit-environment": "Sandbox" if self.user.sandbox else "Production", - "accept": "application/x-protobuf", "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', - "x-apple-operation-id": random.randbytes(8).hex(), "x-apple-request-uuid": str(uuid.uuid4()).upper(), - - "user-agent": "CloudKit/2060.11 (22F82)" + "user-agent": "CloudKit/2060.11 (22F82)", } headers.update(gsa.generate_anisette_headers()) - body = _build_record_save_request(record, self.container, self.user.sandbox, self.scope, zone, owner) + body = _build_record_save_request( + record, self.container, self.user.sandbox, self.scope, zone, owner + ) r = requests.post( "https://gateway.icloud.com/ckdatabase/api/client/record/save", headers=headers, data=body, - verify=False + verify=False, ) print(r.content) - - def _build_record_save_request( record: Record, @@ -114,48 +137,70 @@ def _build_record_save_request( zone: str = "_defaultZone", owner: str = "_defaultOwner", ): - hardware_id = uuid.uuid4() # Generate a new hardware ID for each request? - operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request? - record_id = uuid.uuid4() # Generate a new record ID for each request? + hardware_id = uuid.uuid4() # Generate a new hardware ID for each request? + operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request? + record_id = uuid.uuid4() # Generate a new record ID for each request? request = cloudkit_pb2.RequestOperation() request.header.applicationContainer = container - request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX if sandbox else cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.PRODUCTION + request.header.applicationContainerEnvironment = ( + cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX + if sandbox + else cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.PRODUCTION + ) request.header.deviceHardwareID = str(hardware_id).upper() if database == "PUBLIC": - request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB + request.header.targetDatabase = ( + cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB + ) elif database == "PRIVATE": - request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PRIVATE_DB + request.header.targetDatabase = ( + cloudkit_pb2.RequestOperation.Header.Database.PRIVATE_DB + ) elif database == "SHARED": - request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.SHARED_DB - - request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE + request.header.targetDatabase = ( + cloudkit_pb2.RequestOperation.Header.Database.SHARED_DB + ) + request.header.isolationLevel = ( + cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE + ) request.request.operationUUID = str(operation_uuid).upper() request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE request.request.last = True - - request.recordSaveRequest.record.recordIdentifier.value.name = str(record_id).upper() - request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD + request.recordSaveRequest.record.recordIdentifier.value.name = str( + record_id + ).upper() + request.recordSaveRequest.record.recordIdentifier.value.type = ( + cloudkit_pb2.Identifier.Type.RECORD + ) request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = zone - request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = ( + cloudkit_pb2.Identifier.Type.RECORD_ZONE + ) - request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = owner - request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = ( + owner + ) + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = ( + cloudkit_pb2.Identifier.Type.USER + ) request.recordSaveRequest.record.type.name = record.type for key, value in record.fields.items(): request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field()) request.recordSaveRequest.record.recordField[-1].identifier.name = key - request.recordSaveRequest.record.recordField[-1].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE + request.recordSaveRequest.record.recordField[ + -1 + ].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE request.recordSaveRequest.record.recordField[-1].value.stringValue = value len_bytes = _utils.ULEB128.encode(len(request.SerializeToString())) - return len_bytes + request.SerializeToString() \ No newline at end of file + return len_bytes + request.SerializeToString() diff --git a/icloud/gsa.py b/icloud/gsa.py index 240043c..46ffd14 100644 --- a/icloud/gsa.py +++ b/icloud/gsa.py @@ -5,6 +5,7 @@ import hashlib import hmac import json import locale +import logging import plistlib as plist import uuid from base64 import b64decode, b64encode @@ -17,19 +18,18 @@ import srp._pysrp as srp from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -import logging logger = logging.getLogger("gsa") # Server to use for anisette generation -ANISETTE = False # Use local generation with AOSKit (macOS only) +ANISETTE = False # Use local generation with AOSKit (macOS only) # ANISETTE = "https://sign.rheaa.xyz/" # ANISETTE = 'http://45.132.246.138:6969/' -#ANISETTE = "https://ani.sidestore.io/" +# ANISETTE = "https://ani.sidestore.io/" # ANISETTE = 'https://sideloadly.io/anisette/irGb3Quww8zrhgqnzmrx' # ANISETTE = "http://jkcoxson.com:2052/" -#USER_AGENT = "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0" +# USER_AGENT = "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0" USER_AGENT = "akd/1.0 CFNetwork/978.0.7 Darwin/18.7.0" # Created here so that it is consistent @@ -45,53 +45,57 @@ import urllib3 urllib3.disable_warnings() -def build_client(emulated_device: str = "MacBookPro18,3", emulated_app: str = "accountsd") -> str: - """'Client Information' - String in the following format: - <%MODEL%> <%OS%;%MAJOR%.%MINOR%(%SPMAJOR%,%SPMINOR%);%BUILD%> <%AUTHKIT_BUNDLE_ID%/%AUTHKIT_VERSION% (%APP_BUNDLE_ID%/%APP_VERSION%)> - Where: - MODEL: The model of the device (e.g. MacBookPro15,1 or 'PC' - OS: The OS of the device (e.g. Mac OS X or Windows) - MAJOR: The major version of the OS (e.g. 10) - MINOR: The minor version of the OS (e.g. 15) - SPMAJOR: The major version of the service pack (e.g. 0) (Windows only) - SPMINOR: The minor version of the service pack (e.g. 0) (Windows only) - BUILD: The build number of the OS (e.g. 19C57) - AUTHKIT_BUNDLE_ID: The bundle ID of the AuthKit framework (e.g. com.apple.AuthKit) - AUTHKIT_VERSION: The version of the AuthKit framework (e.g. 1) - APP_BUNDLE_ID: The bundle ID of the app (e.g. com.apple.dt.Xcode) - APP_VERSION: The version of the app (e.g. 3594.4.19) - """ - model = emulated_device - if emulated_device == "PC": - # We're emulating a PC, so we run Windows (Vista?) - os = "Windows" - os_version = "6.2(0,0);9200" - else: - # We're emulating a Mac, so we run macOS Ventura - os = "Mac OS X" - os_version = "13.4.1;22F8" +def build_client( + emulated_device: str = "MacBookPro18,3", emulated_app: str = "accountsd" +) -> str: + """'Client Information' + String in the following format: + <%MODEL%> <%OS%;%MAJOR%.%MINOR%(%SPMAJOR%,%SPMINOR%);%BUILD%> <%AUTHKIT_BUNDLE_ID%/%AUTHKIT_VERSION% (%APP_BUNDLE_ID%/%APP_VERSION%)> + Where: + MODEL: The model of the device (e.g. MacBookPro15,1 or 'PC' + OS: The OS of the device (e.g. Mac OS X or Windows) + MAJOR: The major version of the OS (e.g. 10) + MINOR: The minor version of the OS (e.g. 15) + SPMAJOR: The major version of the service pack (e.g. 0) (Windows only) + SPMINOR: The minor version of the service pack (e.g. 0) (Windows only) + BUILD: The build number of the OS (e.g. 19C57) + AUTHKIT_BUNDLE_ID: The bundle ID of the AuthKit framework (e.g. com.apple.AuthKit) + AUTHKIT_VERSION: The version of the AuthKit framework (e.g. 1) + APP_BUNDLE_ID: The bundle ID of the app (e.g. com.apple.dt.Xcode) + APP_VERSION: The version of the app (e.g. 3594.4.19) + """ - if emulated_app == "Xcode": - app_bundle = "com.apple.dt.Xcode" - app_version = "3594.4.19" - elif emulated_app == "accountsd": - app_bundle = "com.apple.accountsd" - app_version = "113" - else: - app_bundle = "com.apple.iCloud" - app_version = "7.21" + model = emulated_device + if emulated_device == "PC": + # We're emulating a PC, so we run Windows (Vista?) + os = "Windows" + os_version = "6.2(0,0);9200" + else: + # We're emulating a Mac, so we run macOS Ventura + os = "Mac OS X" + os_version = "13.4.1;22F8" + + if emulated_app == "Xcode": + app_bundle = "com.apple.dt.Xcode" + app_version = "3594.4.19" + elif emulated_app == "accountsd": + app_bundle = "com.apple.accountsd" + app_version = "113" + else: + app_bundle = "com.apple.iCloud" + app_version = "7.21" + + if os == "Windows": + authkit_bundle = "com.apple.AuthKitWin" + authkit_version = "1" + else: + authkit_bundle = "com.apple.AOSKit" + authkit_version = "282" + + return f"<{model}> <{os};{os_version}> <{authkit_bundle}/{authkit_version} ({app_bundle}/{app_version})>" - if os == "Windows": - authkit_bundle = "com.apple.AuthKitWin" - authkit_version = "1" - else: - authkit_bundle = "com.apple.AOSKit" - authkit_version = "282" - return f"<{model}> <{os};{os_version}> <{authkit_bundle}/{authkit_version} ({app_bundle}/{app_version})>" - def _generate_cpd() -> dict: cpd = { # Many of these values are not strictly necessary, but may be tracked by Apple @@ -112,22 +116,30 @@ def _generate_cpd() -> dict: cpd.update(generate_anisette_headers()) return cpd -def _generate_meta_headers(serial: str = "0", user_id: uuid = uuid.uuid4(), device_id: uuid = uuid.uuid4()) -> dict: - return { - "X-Apple-I-Client-Time": datetime.utcnow().replace(microsecond=0).isoformat() + "Z", # Current timestamp in ISO 8601 format - "X-Apple-I-TimeZone": str(datetime.utcnow().astimezone().tzinfo), # Abbreviation of the timezone of the device (e.g. EST) +def _generate_meta_headers( + serial: str = "0", user_id: uuid = uuid.uuid4(), device_id: uuid = uuid.uuid4() +) -> dict: + return { + "X-Apple-I-Client-Time": datetime.utcnow().replace(microsecond=0).isoformat() + + "Z", # Current timestamp in ISO 8601 format + "X-Apple-I-TimeZone": str( + datetime.utcnow().astimezone().tzinfo + ), # Abbreviation of the timezone of the device (e.g. EST) # Locale of the device (e.g. en_US) "loc": locale.getdefaultlocale()[0] or "en_US", "X-Apple-Locale": locale.getdefaultlocale()[0] or "en_US", - - "X-Apple-I-MD-RINFO": "17106176", # either 17106176 or 50660608 - - "X-Apple-I-MD-LU": b64encode(str(user_id).upper().encode()).decode(), # 'Local User ID': Base64 encoding of an uppercase UUID - "X-Mme-Device-Id": str(device_id).upper(), # 'Device Unique Identifier', uppercase UUID - "X-Apple-I-SRL-NO": serial, # Serial number + "X-Apple-I-MD-RINFO": "17106176", # either 17106176 or 50660608 + "X-Apple-I-MD-LU": b64encode( + str(user_id).upper().encode() + ).decode(), # 'Local User ID': Base64 encoding of an uppercase UUID + "X-Mme-Device-Id": str( + device_id + ).upper(), # 'Device Unique Identifier', uppercase UUID + "X-Apple-I-SRL-NO": serial, # Serial number } + def _generate_local_anisette() -> dict: logger.debug("Using local anisette generation") """Generates anisette data using AOSKit locally""" @@ -148,6 +160,7 @@ def _generate_local_anisette() -> dict: "X-Apple-I-MD-M": str(h["X-Apple-MD-M"]), } + def _generate_remote_anisette(url: str) -> dict: logger.debug("Using remote anisette generation: " + url) h = json.loads(requests.get(url, timeout=5).text) @@ -156,15 +169,16 @@ def _generate_remote_anisette(url: str) -> dict: "X-Apple-I-MD-M": h["X-Apple-I-MD-M"], } + def generate_anisette_headers() -> dict: if isinstance(ANISETTE, str) and ANISETTE.startswith("http"): a = _generate_remote_anisette(ANISETTE) else: - a =_generate_local_anisette() - + a = _generate_local_anisette() + a.update(_generate_meta_headers(user_id=USER_ID, device_id=DEVICE_ID)) return a - + def authenticated_request(parameters) -> dict: body = { @@ -205,8 +219,8 @@ def check_error(r): if status["ec"] != 0: raise Exception(f"Error {status['ec']}: {status['em']}") - #print(f"Error {status['ec']}: {status['em']}") - #return True + # print(f"Error {status['ec']}: {status['em']}") + # return True return False @@ -248,11 +262,11 @@ def trusted_second_factor(dsid, idms_token): "X-Apple-Identity-Token": identity_token, "X-Apple-App-Info": "com.apple.gs.xcode.auth", "X-Xcode-Version": "11.2 (11B41)", - "X-Mme-Client-Info": build_client(emulated_app="Xcode") + "X-Mme-Client-Info": build_client(emulated_app="Xcode"), } headers.update(generate_anisette_headers()) - + # This will trigger the 2FA prompt on trusted devices # We don't care about the response, it's just some HTML with a form for entering the code # Easier to just use a text prompt @@ -296,7 +310,7 @@ def sms_second_factor(dsid, idms_token): "X-Apple-Identity-Token": identity_token, "X-Apple-App-Info": "com.apple.gs.xcode.auth", "X-Xcode-Version": "11.2 (11B41)", - "X-Mme-Client-Info": build_client(emulated_app="Xcode") + "X-Mme-Client-Info": build_client(emulated_app="Xcode"), } headers.update(generate_anisette_headers()) @@ -359,7 +373,9 @@ def authenticate(username, password): return if r["sp"] != "s2k": - logger.error(f"This implementation only supports s2k. Server returned {r['sp']}") + logger.error( + f"This implementation only supports s2k. Server returned {r['sp']}" + ) return # Change the password out from under the SRP library, as we couldn't calculate it without the salt. @@ -414,4 +430,4 @@ def authenticate(username, password): return else: # print("Assuming 2FA is not required") - return spd \ No newline at end of file + return spd