major refactor

This commit is contained in:
JJTech0130 2023-10-21 20:39:40 -04:00
parent 3b5c868936
commit e4a26ae1fa
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
8 changed files with 228 additions and 316 deletions

1
.gitignore vendored
View file

@ -1,4 +1,5 @@
config.json config.json
config/
IMDAppleServices IMDAppleServices
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View file

@ -78,7 +78,7 @@ def grandslam_bag():
if GRANDSLAM_BAG is not None: if GRANDSLAM_BAG is not None:
return GRANDSLAM_BAG return GRANDSLAM_BAG
import gsa import icloud.gsa as gsa
r = requests.get( r = requests.get(
"https://gsa.apple.com/grandslam/GsService2/lookup", verify=False, "https://gsa.apple.com/grandslam/GsService2/lookup", verify=False,

View file

@ -8,7 +8,7 @@ import random
import bags import bags
import requests import requests
import plistlib import plistlib
import gsa import icloud.gsa as gsa
ANISETTE_SERVER = "wss://ani.sidestore.io/v3/provisioning_session" ANISETTE_SERVER = "wss://ani.sidestore.io/v3/provisioning_session"

80
examples/cloudkit.py Normal file
View file

@ -0,0 +1,80 @@
import sys
sys.path.append(".")
import requests
import uuid
import plistlib
from base64 import b64encode, b64decode
import json
import random
import icloud.gsa as gsa
import icloud.cloudkit as cloudkit
from rich.logging import RichHandler
import logging
logging.basicConfig(
level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
def main():
CONFIG_PATH = "config/cloudkit.json"
# See if we have a search party token saved
import os
if os.path.exists(CONFIG_PATH):
print("Using saved config...")
#print("Found search party token!")
with open(CONFIG_PATH, "r") as f:
j = json.load(f)
cloudkit_token = j["cloudkit_token"]
ds_prs_id = j["ds_prs_id"]
else:
# Prompt for username and password
USERNAME = input("Username: ")
PASSWORD = input("Password: ")
r = icloud.login(USERNAME, PASSWORD, delegates=["com.apple.mobileme"])
cloudkit_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['cloudKitToken']
ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response
print("Logged in!")
with open(CONFIG_PATH, "w") as f:
json.dump({
"cloudkit_token": cloudkit_token,
"ds_prs_id": ds_prs_id,
}, f, indent=4)
print("CloudKit token: ", cloudkit_token)
headers = {
"x-cloudkit-authtoken": cloudkit_token,
"x-cloudkit-userid": "_ec5fa262446ad56fb4bda84d00e981ff", # Hash of bundle id and icloud id
"x-cloudkit-containerid": "iCloud.dev.jjtech.experiments.cktest",
"x-cloudkit-bundleid": "dev.jjtech.experiments.cktest",
"x-cloudkit-bundleversion": "1",
"x-cloudkit-databasescope": "Public",
"x-cloudkit-environment": "Sandbox",
"accept": "application/x-protobuf",
"content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true',
"x-apple-operation-id": random.randbytes(8).hex(),
"x-apple-request-uuid": str(uuid.uuid4()).upper()
}
headers.update(gsa.generate_anisette_headers())
body = cloudkit.build_record_save_request(cloudkit.Record(uuid.uuid4(), "ToDoItem", {"title": "Test"}), "iCloud.dev.jjtech.experiments.cktest", sandbox=True)
r = requests.post(
"https://gateway.icloud.com/ckdatabase/api/client/record/save",
headers=headers,
data=body,
verify=False
)
print(r.content)
if __name__ == "__main__":
main()

View file

@ -1,16 +1,22 @@
# Add parent directory to path # Add parent directory to path
import sys import sys
sys.path.append("..")
sys.path.append(".") sys.path.append(".")
import gsa import icloud.gsa as gsa
import requests import requests
import uuid import uuid
import plistlib import plistlib
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
import json import json
import icloud
CONFIG_PATH = "examples/openhaystack.json" from rich.logging import RichHandler
import logging
logging.basicConfig(
level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
CONFIG_PATH = "config/openhaystack.json"
# See if we have a search party token saved # See if we have a search party token saved
import os import os
if os.path.exists(CONFIG_PATH): if os.path.exists(CONFIG_PATH):
@ -26,51 +32,7 @@ else:
USERNAME = input("Username: ") USERNAME = input("Username: ")
PASSWORD = input("Password: ") PASSWORD = input("Password: ")
print("Authenticating with Grand Slam...") r = icloud.login(USERNAME, PASSWORD, delegates=["com.apple.mobileme"])
g = gsa.authenticate(USERNAME, PASSWORD)
#print(g)
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
print("Authenticated!")
#print(g)
data = {
"apple-id": USERNAME,
#"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"delegates": {"com.apple.mobileme": {}},
"password": pet,
"client-id": str(uuid.uuid4()),
}
data = plistlib.dumps(data)
from emulated import nac
print("Generating validation data...")
v = nac.generate_validation_data()
print("Generated validation data!")
headers = {
"X-Apple-ADSID": g["adsid"],
"X-Mme-Nas-Qualify": b64encode(v).decode(),
"User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0",
"X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts
}
headers.update(gsa.generate_anisette_headers())
print(headers)
print("Logging in to iCloud...")
r = requests.post(
"https://setup.icloud.com/setup/prefpane/login",
auth=(USERNAME, pet),
data=data,
headers=headers,
verify=False,
)
print(r)
print(r.headers)
r = plistlib.loads(r.content)
print(r)
search_party_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['searchPartyToken'] search_party_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['searchPartyToken']
ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response
@ -90,17 +52,17 @@ r = requests.post(
auth=(ds_prs_id, search_party_token), auth=(ds_prs_id, search_party_token),
headers=gsa.generate_anisette_headers(), headers=gsa.generate_anisette_headers(),
json={ json={
"search": [ "search": [
{ {
"startDate": 1697662550688, "startDate": 1697662550688,
"endDate": 1697673599999, "endDate": 1697673599999,
"ids": [ "ids": [
"/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s=" "/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s="
] ]
} }
] ]
} },
verify=False,
) )
#print(r.headers) #print(r.headers)

68
icloud/__init__.py Normal file
View file

@ -0,0 +1,68 @@
import uuid
import plistlib
from . import gsa
import logging
import requests
import base64
from emulated import nac
logger = logging.getLogger("icloud")
USER_AGENT = "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0"
def login(username: str, password: str, delegates: set[str] = ["com.apple.private.ids"], grandslam: bool = True, anisette: str | bool = False):
"""
Logs into Apple services listed in `delegates` and returns a dictionary of responses.
Commonly used delegates are:
- `com.apple.private.ids`
- `com.apple.mobileme`
`grandslam` configures if the new GrandSlam authentication flow is used. This is required for some delegates, and improves the 2FA experience.
`anisette` configures which server to request anisette data from. If `False`, local anisette generation using AOSKit is attempted. This is not required if `grandslam` is `False`.
"""
if grandslam:
# TODO: Provide anisette preferences to gsa.authenticate
g = gsa.authenticate(username, password)
# Replace the password with the PET token
password = g["t"]["com.apple.gs.idms.pet"]["token"]
adsid = g["adsid"]
logger.debug("Authenticated with GrandSlam")
delegates = {delegate: {} for delegate in delegates}
if "com.apple.private.ids" in delegates:
delegates["com.apple.private.ids"]["protocol-version"] = "4"
data = {
"apple-id": username,
"delegates": delegates,
"password": password,
"client-id": str(uuid.uuid4()),
}
data = plistlib.dumps(data)
logger.debug("Generating validation data")
v = nac.generate_validation_data()
logger.debug("Generated validation data")
headers = {
"X-Apple-ADSID": adsid,
"X-Mme-Nas-Qualify": base64.b64encode(v).decode(),
"User-Agent": USER_AGENT,
"X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts
}
headers.update(gsa.generate_anisette_headers())
logger.debug("Making login request")
r = requests.post(
"https://setup.icloud.com/setup/prefpane/login",
auth=(username, password),
data=data,
headers=headers,
verify=False,
)
# TODO: Error checking and parsing of this response
return plistlib.loads(r.content)

View file

@ -1,264 +1,65 @@
# Add parent directory to path
import sys
sys.path.append("..")
sys.path.append(".")
import gsa
import requests
import uuid
import plistlib
from base64 import b64encode, b64decode
import json
import random
CONFIG_PATH = "examples/cloudkit.json"
# See if we have a search party token saved
import os
if os.path.exists(CONFIG_PATH):
print("Using saved config...")
#print("Found search party token!")
with open(CONFIG_PATH, "r") as f:
j = json.load(f)
cloudkit_token = j["cloudkit_token"]
ds_prs_id = j["ds_prs_id"]
else:
# Prompt for username and password
USERNAME = input("Username: ")
PASSWORD = input("Password: ")
print("Authenticating with Grand Slam...")
g = gsa.authenticate(USERNAME, PASSWORD)
#print(g)
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
print("Authenticated!")
#print(g)
data = {
"apple-id": USERNAME,
#"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
"delegates": {"com.apple.mobileme": {}},
"password": pet,
"client-id": str(uuid.uuid4()),
}
data = plistlib.dumps(data)
from emulated import nac
print("Generating validation data...")
v = nac.generate_validation_data()
print("Generated validation data!")
headers = {
"X-Apple-ADSID": g["adsid"],
"X-Mme-Nas-Qualify": b64encode(v).decode(),
"User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0",
"X-Mme-Client-Info": gsa.build_client(emulated_app="accountsd") # Otherwise we get MOBILEME_TERMS_OF_SERVICE_UPDATE on some accounts
}
headers.update(gsa.generate_anisette_headers())
print(headers)
print("Logging in to iCloud...")
r = requests.post(
"https://setup.icloud.com/setup/prefpane/login",
auth=(USERNAME, pet),
data=data,
headers=headers,
verify=False,
)
print(r)
print(r.headers)
r = plistlib.loads(r.content)
print(r)
cloudkit_token = r['delegates']['com.apple.mobileme']['service-data']['tokens']['cloudKitToken']
ds_prs_id = r['delegates']['com.apple.mobileme']['service-data']['appleAccountInfo']['dsPrsID'] # This can also be obtained from the grandslam response
print("Logged in!")
with open(CONFIG_PATH, "w") as f:
json.dump({
"cloudkit_token": cloudkit_token,
"ds_prs_id": ds_prs_id,
}, f, indent=4)
print("CloudKit token: ", cloudkit_token)
headers = {
"x-cloudkit-authtoken": cloudkit_token,
"x-cloudkit-userid": "_ec5fa262446ad56fb4bda84d00e981ff", # Hash of bundle id and icloud id
"x-cloudkit-containerid": "iCloud.dev.jjtech.experiments.cktest",
"x-cloudkit-bundleid": "dev.jjtech.experiments.cktest",
"x-cloudkit-bundleversion": "1",
"x-cloudkit-databasescope": "Public",
"x-cloudkit-environment": "Sandbox",
"accept": "application/x-protobuf",
"content-type": 'application/x-protobuf; desc="https://gateway.icloud.com:443/static/protobuf/CloudDB/CloudDBClient.desc"; messageType=RequestOperation; delimited=true',
"x-apple-operation-id": random.randbytes(8).hex(),
"x-apple-request-uuid": str(uuid.uuid4()).upper()
}
headers.update(gsa.generate_anisette_headers())
import cloudkit_pb2
# header {
# applicationContainer: "iCloud.dev.jjtech.experiments.cktest"
# applicationBundle: "dev.jjtech.experiments.cktest"
# applicationVersion: "1"
# deviceIdentifier {
# name: "776D147D-DAF3-495F-A834-12526DAECA5C"
# type: DEVICE
# }
# deviceSoftwareVersion: "13.4.1"
# deviceHardwareVersion: "MacBookPro18,3"
# deviceLibraryName: "com.apple.cloudkit.CloudKitDaemon"
# deviceLibraryVersion: "2060.11"
# locale {
# languageCode: "en-US"
# regionCode: "US"
# }
# mmcsProtocolVersion: "5.0"
# applicationContainerEnvironment: SANDBOX
# deviceAssignedName: "James\342\200\231s Laptop"
# deviceHardwareID: "776D147D-DAF3-495F-A834-12526DAECA5C"
# targetDatabase: PUBLIC_DB
# isolationLevel: ZONE
# unk1: 0
# unk2: "7B40B37D-2503-5161-9B4E-84D20478694C"
# deviceSerial: "X5T0QFNHXP"
# unk3: 0
# unk4: 1
# }
# request {
# operationUUID: "B1FC75B3-D69E-4368-BD0A-93170C7A3017"
# type: RECORD_SAVE_TYPE
# last: true
# }
# recordSaveRequest {
# record {
# recordIdentifier {
# value {
# name: "699F278B-1381-4480-8297-7751B88B8F06"
# type: RECORD
# }
# zoneIdentifier {
# value {
# name: "_defaultZone"
# type: RECORD_ZONE
# }
# ownerIdentifier {
# name: "_defaultOwner"
# type: USER
# }
# }
# }
# type {
# name: "ToDoItem"
# }
# recordField {
# identifier {
# name: "name"
# }
# value {
# type: STRING_TYPE
# stringValue: "Test item"
# }
# }
# }
# unk1: 1
# unk2: 2
# }
from typing import Literal from typing import Literal
from . import cloudkit_pb2
import uuid
import dataclasses
import typing
#def build_cloudkit_record_save_request(container: str, sandbox: bool, database: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"], zone: str, ): @dataclasses.dataclass
class Record:
name: uuid.UUID
type: str
fields: dict[str, typing.Any]
request = cloudkit_pb2.RequestOperation() def build_record_save_request(
request.header.applicationContainer = "iCloud.dev.jjtech.experiments.cktest" record: Record,
#request.header.applicationBundle = "dev.jjtech.experiments.cktest" container: str,
#request.header.applicationVersion = "1" sandbox: bool = False,
#request.header.deviceIdentifier.name = "776D147D-DAF3-495F-A834-12526DAECA5C" database: Literal["PUBLIC"] | Literal["PRIVATE"] | Literal["SHARED"] = "PUBLIC",
#request.header.deviceIdentifier.type = cloudkit_pb2.Identifier.Type.DEVICE zone: str = "_defaultZone",
#request.header.deviceSoftwareVersion = "13.4.1" owner: str = "_defaultOwner",
#request.header.deviceHardwareVersion = "MacBookPro18,3" ):
#request.header.deviceLibraryName = "com.apple.cloudkit.CloudKitDaemon" MAGIC_BYTES = b"\xfe\x03"
#request.header.deviceLibraryVersion = "2060.11"
#request.header.locale.languageCode = "en-US" hardware_id = uuid.uuid4() # Generate a new hardware ID for each request?
#request.header.locale.regionCode = "US" operation_uuid = uuid.uuid4() # Generate a new operation UUID for each request?
#request.header.mmcsProtocolVersion = "5.0" record_id = uuid.uuid4() # Generate a new record ID for each request?
request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX
#request.header.deviceAssignedName = "Jamess Laptop" request = cloudkit_pb2.RequestOperation()
request.header.deviceHardwareID = str(uuid.uuid4()).upper() request.header.applicationContainer = container
request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB request.header.applicationContainerEnvironment = cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.SANDBOX if sandbox else cloudkit_pb2.RequestOperation.Header.ContainerEnvironment.PRODUCTION
request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE
#request.header.unk1 = 0 request.header.deviceHardwareID = str(hardware_id).upper()
#request.header.unk2 = "7B40B37D-2503-5161-9B4E-84D20478694C"
#request.header.deviceSerial = "X5T0QFNHXP" if database == "PUBLIC":
#request.header.unk3 = 0 request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PUBLIC_DB
#request.header.unk4 = 1 elif database == "PRIVATE":
request.request.operationUUID = str(uuid.uuid4()).upper() request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.PRIVATE_DB
request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE elif database == "SHARED":
request.request.last = True request.header.targetDatabase = cloudkit_pb2.RequestOperation.Header.Database.SHARED_DB
request.recordSaveRequest.record.recordIdentifier.value.name = str(uuid.uuid4()).upper()
request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD request.header.isolationLevel = cloudkit_pb2.RequestOperation.Header.IsolationLevel.ZONE
request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = "_defaultZone"
request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE
request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = "_defaultOwner"
request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER
request.recordSaveRequest.record.type.name = "ToDoItem"
# RecordField is a repeated field, so we have to append to it
request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field())
request.recordSaveRequest.record.recordField[0].identifier.name = "name"
request.recordSaveRequest.record.recordField[0].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE
request.recordSaveRequest.record.recordField[0].value.stringValue = "Test pypush 5"
#request.recordSaveRequest.record.recordField.identifier.name = "name"
#request.recordSaveRequest.record.recordField.value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE
#request.recordSaveRequest.record.recordField.value.stringValue = "Test item"
#request.recordSaveRequest.unk1 = 1
#request.recordSaveRequest.unk2 = 2
request.request.operationUUID = str(operation_uuid).upper()
request.request.type = cloudkit_pb2.Operation.Type.RECORD_SAVE_TYPE
request.request.last = True
# WHAT ARE THESE BYTES???
body = b"\xfe\x03" + request.SerializeToString()
r =requests.post(
"https://gateway.icloud.com/ckdatabase/api/client/record/save",
headers=headers,
data=body,
verify=False
)
print(r.content) request.recordSaveRequest.record.recordIdentifier.value.name = str(record_id).upper()
# import time request.recordSaveRequest.record.recordIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD
# r = requests.post( request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.name = zone
# "https://gateway.icloud.com/acsnservice/fetch", request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.value.type = cloudkit_pb2.Identifier.Type.RECORD_ZONE
# auth=(ds_prs_id, search_party_token),
# headers=gsa.generate_anisette_headers(),
# json={
# "search": [
# {
# "startDate": 1697662550688,
# "endDate": 1697673599999,
# "ids": [
# "/a8rQOW7Ucg2OOBo0D3i/7IZAbvRXcO+5y/1w0QVE4s="
# ]
# }
# ]
# }
# )
# #print(r.headers) request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.name = owner
# if r.status_code != 200 or len(r.content) == 0: request.recordSaveRequest.record.recordIdentifier.zoneIdentifier.ownerIdentifier.type = cloudkit_pb2.Identifier.Type.USER
# print("Error fetching locations (ratelimit?): ", r.status_code, r.headers)
# exit(1) request.recordSaveRequest.record.type.name = record.type
# r = r.content.decode()
# print(json.dumps(json.loads(r), indent=4)) for key, value in record.fields.items():
request.recordSaveRequest.record.recordField.append(cloudkit_pb2.Record.Field())
request.recordSaveRequest.record.recordField[-1].identifier.name = key
request.recordSaveRequest.record.recordField[-1].value.type = cloudkit_pb2.Record.Field.Value.Type.STRING_TYPE
request.recordSaveRequest.record.recordField[-1].value.stringValue = value
return MAGIC_BYTES + request.SerializeToString()