diff --git a/.gitignore b/.gitignore index 2e400b4..0d5249b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ config.json +config/ IMDAppleServices # Byte-compiled / optimized / DLL files diff --git a/bags.py b/bags.py index ae5099d..9b9d641 100644 --- a/bags.py +++ b/bags.py @@ -78,7 +78,7 @@ def grandslam_bag(): if GRANDSLAM_BAG is not None: return GRANDSLAM_BAG - import gsa + import icloud.gsa as gsa r = requests.get( "https://gsa.apple.com/grandslam/GsService2/lookup", verify=False, diff --git a/emulated/anisette.py b/emulated/anisette.py index fd52d31..54a45ca 100644 --- a/emulated/anisette.py +++ b/emulated/anisette.py @@ -8,7 +8,7 @@ import random import bags import requests import plistlib -import gsa +import icloud.gsa as gsa ANISETTE_SERVER = "wss://ani.sidestore.io/v3/provisioning_session" diff --git a/examples/cloudkit.py b/examples/cloudkit.py new file mode 100644 index 0000000..cee9649 --- /dev/null +++ b/examples/cloudkit.py @@ -0,0 +1,80 @@ +import sys +sys.path.append(".") + +import requests +import uuid +import plistlib +from base64 import b64encode, b64decode +import json +import random +import icloud.gsa as gsa +import icloud.cloudkit as cloudkit + +from rich.logging import RichHandler +import logging +logging.basicConfig( + level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()] +) + +def main(): + CONFIG_PATH = "config/cloudkit.json" + # See if we have a search party token saved + import os + if os.path.exists(CONFIG_PATH): + print("Using saved config...") + #print("Found search party token!") + with open(CONFIG_PATH, "r") as f: + j = json.load(f) + cloudkit_token = j["cloudkit_token"] + ds_prs_id = j["ds_prs_id"] + + else: + # Prompt for username and password + USERNAME = input("Username: ") + PASSWORD = input("Password: ") + + r = icloud.login(USERNAME, PASSWORD, delegates=["com.apple.mobileme"]) + + cloudkit_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['cloudKitToken'] + ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response + + print("Logged in!") + + with open(CONFIG_PATH, "w") as f: + json.dump({ + "cloudkit_token": cloudkit_token, + "ds_prs_id": ds_prs_id, + }, f, indent=4) + + print("CloudKit token: ", cloudkit_token) + + headers = { + "x-cloudkit-authtoken": cloudkit_token, + "x-cloudkit-userid": "_ec5fa262446ad56fb4bda84d00e981ff", # Hash of bundle id and icloud id + "x-cloudkit-containerid": "iCloud.dev.jjtech.experiments.cktest", + "x-cloudkit-bundleid": "dev.jjtech.experiments.cktest", + "x-cloudkit-bundleversion": "1", + "x-cloudkit-databasescope": "Public", + "x-cloudkit-environment": "Sandbox", + + "accept": "application/x-protobuf", + "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', + + "x-apple-operation-id": random.randbytes(8).hex(), + "x-apple-request-uuid": str(uuid.uuid4()).upper() + } + + headers.update(gsa.generate_anisette_headers()) + + body = cloudkit.build_record_save_request(cloudkit.Record(uuid.uuid4(), "ToDoItem", {"title": "Test"}), "iCloud.dev.jjtech.experiments.cktest", sandbox=True) + r = requests.post( + "https://gateway.icloud.com/ckdatabase/api/client/record/save", + headers=headers, + data=body, + verify=False + ) + + print(r.content) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/openhaystack.py b/examples/openhaystack.py index e5630c0..196bf31 100644 --- a/examples/openhaystack.py +++ b/examples/openhaystack.py @@ -1,16 +1,22 @@ # Add parent directory to path import sys -sys.path.append("..") sys.path.append(".") -import gsa +import icloud.gsa as gsa import requests import uuid import plistlib from base64 import b64encode, b64decode import json +import icloud -CONFIG_PATH = "examples/openhaystack.json" +from rich.logging import RichHandler +import logging +logging.basicConfig( + level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()] +) + +CONFIG_PATH = "config/openhaystack.json" # See if we have a search party token saved import os if os.path.exists(CONFIG_PATH): @@ -26,51 +32,7 @@ else: USERNAME = input("Username: ") PASSWORD = input("Password: ") - print("Authenticating with Grand Slam...") - g = gsa.authenticate(USERNAME, PASSWORD) - #print(g) - pet = g["t"]["com.apple.gs.idms.pet"]["token"] - print("Authenticated!") - #print(g) - - data = { - "apple-id": USERNAME, - #"delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, - "delegates": {"com.apple.mobileme": {}}, - "password": pet, - "client-id": str(uuid.uuid4()), - - } - data = plistlib.dumps(data) - from emulated import nac - - print("Generating validation data...") - v = nac.generate_validation_data() - print("Generated validation data!") - - headers = { - "X-Apple-ADSID": g["adsid"], - "X-Mme-Nas-Qualify": b64encode(v).decode(), - "User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0", - "X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts - } - headers.update(gsa.generate_anisette_headers()) - - print(headers) - - print("Logging in to iCloud...") - r = requests.post( - "https://setup.icloud.com/setup/prefpane/login", - auth=(USERNAME, pet), - data=data, - headers=headers, - verify=False, - ) - - print(r) - print(r.headers) - r = plistlib.loads(r.content) - print(r) + r = icloud.login(USERNAME, PASSWORD, delegates=["com.apple.mobileme"]) search_party_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['searchPartyToken'] ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response @@ -90,17 +52,17 @@ r = requests.post( auth=(ds_prs_id, search_party_token), headers=gsa.generate_anisette_headers(), json={ - "search": [ - { - "startDate": 1697662550688, - "endDate": 1697673599999, - "ids": [ - "/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s=" - ] - } - ] -} - + "search": [ + { + "startDate": 1697662550688, + "endDate": 1697673599999, + "ids": [ + "/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s=" + ] + } + ] + }, + verify=False, ) #print(r.headers) diff --git a/icloud/__init__.py b/icloud/__init__.py new file mode 100644 index 0000000..322be59 --- /dev/null +++ b/icloud/__init__.py @@ -0,0 +1,68 @@ +import uuid +import plistlib +from . import gsa +import logging +import requests +import base64 + +from emulated import nac + +logger = logging.getLogger("icloud") + +USER_AGENT = "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0" + +def login(username: str, password: str, delegates: set[str] = ["com.apple.private.ids"], grandslam: bool = True, anisette: str | bool = False): + """ + Logs into Apple services listed in `delegates` and returns a dictionary of responses. + Commonly used delegates are: + - `com.apple.private.ids` + - `com.apple.mobileme` + + `grandslam` configures if the new GrandSlam authentication flow is used. This is required for some delegates, and improves the 2FA experience. + `anisette` configures which server to request anisette data from. If `False`, local anisette generation using AOSKit is attempted. This is not required if `grandslam` is `False`. + """ + + if grandslam: + # TODO: Provide anisette preferences to gsa.authenticate + g = gsa.authenticate(username, password) + # Replace the password with the PET token + password = g["t"]["com.apple.gs.idms.pet"]["token"] + adsid = g["adsid"] + logger.debug("Authenticated with GrandSlam") + + delegates = {delegate: {} for delegate in delegates} + if "com.apple.private.ids" in delegates: + delegates["com.apple.private.ids"]["protocol-version"] = "4" + + data = { + "apple-id": username, + "delegates": delegates, + "password": password, + "client-id": str(uuid.uuid4()), + } + data = plistlib.dumps(data) + + logger.debug("Generating validation data") + v = nac.generate_validation_data() + logger.debug("Generated validation data") + + headers = { + "X-Apple-ADSID": adsid, + "X-Mme-Nas-Qualify": base64.b64encode(v).decode(), + "User-Agent": USER_AGENT, + "X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts + } + headers.update(gsa.generate_anisette_headers()) + + + logger.debug("Making login request") + r = requests.post( + "https://setup.icloud.com/setup/prefpane/login", + auth=(username, password), + data=data, + headers=headers, + verify=False, + ) + + # TODO: Error checking and parsing of this response + return plistlib.loads(r.content) \ No newline at end of file diff --git a/icloud/cloudkit.py b/icloud/cloudkit.py index b1e8b3a..06fb77a 100644 --- a/icloud/cloudkit.py +++ b/icloud/cloudkit.py @@ -1,264 +1,65 @@ -# Add parent directory to path -import sys -sys.path.append("..") -sys.path.append(".") - -import gsa -import requests -import uuid -import plistlib -from base64 import b64encode, b64decode -import json -import random - -CONFIG_PATH = "examples/cloudkit.json" -# See if we have a search party token saved -import os -if os.path.exists(CONFIG_PATH): - print("Using saved config...") - #print("Found search party token!") - with open(CONFIG_PATH, "r") as f: - j = json.load(f) - cloudkit_token = j["cloudkit_token"] - ds_prs_id = j["ds_prs_id"] - -else: - # Prompt for username and password - USERNAME = input("Username: ") - PASSWORD = input("Password: ") - - print("Authenticating with Grand Slam...") - g = gsa.authenticate(USERNAME, PASSWORD) - #print(g) - pet = g["t"]["com.apple.gs.idms.pet"]["token"] - print("Authenticated!") - #print(g) - - data = { - "apple-id": USERNAME, - #"delegates": {"com.apple.private.ids": {"protocol-version": "4"}}, - "delegates": {"com.apple.mobileme": {}}, - "password": pet, - "client-id": str(uuid.uuid4()), - - } - data = plistlib.dumps(data) - from emulated import nac - - print("Generating validation data...") - v = nac.generate_validation_data() - print("Generated validation data!") - - headers = { - "X-Apple-ADSID": g["adsid"], - "X-Mme-Nas-Qualify": b64encode(v).decode(), - "User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0", - "X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts - } - headers.update(gsa.generate_anisette_headers()) - - print(headers) - - print("Logging in to iCloud...") - r = requests.post( - "https://setup.icloud.com/setup/prefpane/login", - auth=(USERNAME, pet), - data=data, - headers=headers, - verify=False, - ) - - print(r) - print(r.headers) - r = plistlib.loads(r.content) - print(r) - - cloudkit_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['cloudKitToken'] - ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response - - print("Logged in!") - - with open(CONFIG_PATH, "w") as f: - json.dump({ - "cloudkit_token": cloudkit_token, - "ds_prs_id": ds_prs_id, - }, f, indent=4) - -print("CloudKit token: ", cloudkit_token) - -headers = { - "x-cloudkit-authtoken": cloudkit_token, - "x-cloudkit-userid": "_ec5fa262446ad56fb4bda84d00e981ff", # Hash of bundle id and icloud id - "x-cloudkit-containerid": "iCloud.dev.jjtech.experiments.cktest", - "x-cloudkit-bundleid": "dev.jjtech.experiments.cktest", - "x-cloudkit-bundleversion": "1", - "x-cloudkit-databasescope": "Public", - "x-cloudkit-environment": "Sandbox", - - "accept": "application/x-protobuf", - "content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true', - - "x-apple-operation-id": random.randbytes(8).hex(), - "x-apple-request-uuid": str(uuid.uuid4()).upper() - - -} - -headers.update(gsa.generate_anisette_headers()) - -import cloudkit_pb2 - -# header { -# applicationContainer: "iCloud.dev.jjtech.experiments.cktest" -# applicationBundle: "dev.jjtech.experiments.cktest" -# applicationVersion: "1" -# deviceIdentifier { -# name: "776D147D-DAF3-495F-A834-12526DAECA5C" -# type: DEVICE -# } -# deviceSoftwareVersion: "13.4.1" -# deviceHardwareVersion: "MacBookPro18,3" -# deviceLibraryName: "com.apple.cloudkit.CloudKitDaemon" -# deviceLibraryVersion: "2060.11" -# locale { -# languageCode: "en-US" -# regionCode: "US" -# } -# mmcsProtocolVersion: "5.0" -# applicationContainerEnvironment: SANDBOX -# deviceAssignedName: "James\342\200\231s Laptop" -# deviceHardwareID: "776D147D-DAF3-495F-A834-12526DAECA5C" -# targetDatabase: PUBLIC_DB -# isolationLevel: ZONE -# unk1: 0 -# unk2: "7B40B37D-2503-5161-9B4E-84D20478694C" -# deviceSerial: "X5T0QFNHXP" -# unk3: 0 -# unk4: 1 -# } -# request { -# operationUUID: "B1FC75B3-D69E-4368-BD0A-93170C7A3017" -# type: RECORD_SAVE_TYPE -# last: true -# } -# recordSaveRequest { -# record { -# recordIdentifier { -# value { -# name: "699F278B-1381-4480-8297-7751B88B8F06" -# type: RECORD -# } -# zoneIdentifier { -# value { -# name: "_defaultZone" -# type: RECORD_ZONE -# } -# ownerIdentifier { -# name: "_defaultOwner" -# type: USER -# } -# } -# } -# type { -# name: "ToDoItem" -# } -# recordField { -# identifier { -# name: "name" -# } -# value { -# type: STRING_TYPE -# stringValue: "Test item" -# } -# } -# } -# unk1: 1 -# unk2: 2 -# } - from typing import Literal +from . import cloudkit_pb2 +import uuid +import dataclasses +import typing -#def build_cloudkit_record_save_request(container: str, sandbox: bool, database: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"], zone: str, ): +@dataclasses.dataclass +class Record: + name: uuid.UUID + type: str + fields: dict[str, typing.Any] -request = cloudkit_pb2.RequestOperation() -request.header.applicationContainer = "iCloud.dev.jjtech.experiments.cktest" -#request.header.applicationBundle = "dev.jjtech.experiments.cktest" -#request.header.applicationVersion = "1" -#request.header.deviceIdentifier.name = "776D147D-DAF3-495F-A834-12526DAECA5C" -#request.header.deviceIdentifier.type = cloudkit_pb2.Identifier.Type.DEVICE -#request.header.deviceSoftwareVersion = "13.4.1" -#request.header.deviceHardwareVersion = "MacBookPro18,3" -#request.header.deviceLibraryName = "com.apple.cloudkit.CloudKitDaemon" -#request.header.deviceLibraryVersion = "2060.11" -#request.header.locale.languageCode = "en-US" -#request.header.locale.regionCode = "US" -#request.header.mmcsProtocolVersion = "5.0" -request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX -#request.header.deviceAssignedName = "James’s Laptop" -request.header.deviceHardwareID = str(uuid.uuid4()).upper() -request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB -request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE -#request.header.unk1 = 0 -#request.header.unk2 = "7B40B37D-2503-5161-9B4E-84D20478694C" -#request.header.deviceSerial = "X5T0QFNHXP" -#request.header.unk3 = 0 -#request.header.unk4 = 1 -request.request.operationUUID = str(uuid.uuid4()).upper() -request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE -request.request.last = True -request.recordSaveRequest.record.recordIdentifier.value.name = str(uuid.uuid4()).upper() -request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD -request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = "_defaultZone" -request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE -request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = "_defaultOwner" -request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER -request.recordSaveRequest.record.type.name = "ToDoItem" -# RecordField is a repeated field, so we have to append to it -request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field()) -request.recordSaveRequest.record.recordField[0].identifier.name = "name" -request.recordSaveRequest.record.recordField[0].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE -request.recordSaveRequest.record.recordField[0].value.stringValue = "Test pypush 5" -#request.recordSaveRequest.record.recordField.identifier.name = "name" -#request.recordSaveRequest.record.recordField.value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE -#request.recordSaveRequest.record.recordField.value.stringValue = "Test item" -#request.recordSaveRequest.unk1 = 1 -#request.recordSaveRequest.unk2 = 2 +def build_record_save_request( + record: Record, + container: str, + sandbox: bool = False, + database: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC", + zone: str = "_defaultZone", + owner: str = "_defaultOwner", +): + MAGIC_BYTES = b"\xfe\x03" + + hardware_id = uuid.uuid4() # Generate a new hardware ID for each request? + operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request? + record_id = uuid.uuid4() # Generate a new record ID for each request? + + request = cloudkit_pb2.RequestOperation() + request.header.applicationContainer = container + request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX if sandbox else cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.PRODUCTION + + request.header.deviceHardwareID = str(hardware_id).upper() + + if database == "PUBLIC": + request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB + elif database == "PRIVATE": + request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PRIVATE_DB + elif database == "SHARED": + request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.SHARED_DB + + request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE + request.request.operationUUID = str(operation_uuid).upper() + request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE + request.request.last = True -# WHAT ARE THESE BYTES??? -body = b"\xfe\x03" + request.SerializeToString() -r =requests.post( - "https://gateway.icloud.com/ckdatabase/api/client/record/save", - headers=headers, - data=body, - verify=False -) -print(r.content) -# import time + request.recordSaveRequest.record.recordIdentifier.value.name = str(record_id).upper() + request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD -# r = requests.post( -# "https://gateway.icloud.com/acsnservice/fetch", -# auth=(ds_prs_id, search_party_token), -# headers=gsa.generate_anisette_headers(), -# json={ -# "search": [ -# { -# "startDate": 1697662550688, -# "endDate": 1697673599999, -# "ids": [ -# "/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s=" -# ] -# } -# ] -# } - -# ) + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = zone + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE -# #print(r.headers) -# if r.status_code != 200 or len(r.content) == 0: -# print("Error fetching locations (ratelimit?): ", r.status_code, r.headers) -# exit(1) -# r = r.content.decode() -# print(json.dumps(json.loads(r), indent=4)) \ No newline at end of file + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = owner + request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER + + request.recordSaveRequest.record.type.name = record.type + + for key, value in record.fields.items(): + request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field()) + request.recordSaveRequest.record.recordField[-1].identifier.name = key + request.recordSaveRequest.record.recordField[-1].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE + request.recordSaveRequest.record.recordField[-1].value.stringValue = value + + return MAGIC_BYTES + request.SerializeToString() \ No newline at end of file diff --git a/gsa.py b/icloud/gsa.py similarity index 100% rename from gsa.py rename to icloud/gsa.py