Implement lookups (again) 🎉

This commit is contained in:
JJTech0130 2023-05-02 14:10:13 -04:00
parent 949a34f73f
commit 5a3f60610e
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
2 changed files with 33 additions and 37 deletions

26
demo.py
View file

@ -10,6 +10,16 @@ try:
except FileNotFoundError: except FileNotFoundError:
CONFIG = {} CONFIG = {}
def input_multiline(prompt):
print(prompt)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines)
def refresh_token(): def refresh_token():
# If no username is set, prompt for it # If no username is set, prompt for it
if "username" not in CONFIG: if "username" not in CONFIG:
@ -102,6 +112,13 @@ def refresh_madrid_cert():
CONFIG["madrid_cert"] = madrid_cert CONFIG["madrid_cert"] = madrid_cert
if not 'push' in CONFIG:
print("No push conn")
conn = create_connection()
else:
print("restoring push conn")
conn = restore_connection()
if not 'madrid_cert' in CONFIG: if not 'madrid_cert' in CONFIG:
print("No madrid cert") print("No madrid cert")
if not 'key' in CONFIG: if not 'key' in CONFIG:
@ -110,14 +127,11 @@ if not 'madrid_cert' in CONFIG:
print("No auth token") print("No auth token")
refresh_token() refresh_token()
refresh_cert() refresh_cert()
if not 'push' in CONFIG:
print("No push conn")
conn = create_connection()
else:
print("restoring push conn")
conn = restore_connection()
refresh_madrid_cert() refresh_madrid_cert()
print("Got new madrid cert") print("Got new madrid cert")
print("Doing lookup")
print(ids.lookup(conn, ['mailto:jjtech@jjtech.dev'], (CONFIG['key'], CONFIG['madrid_cert']), CONFIG['username']))
print("Done") print("Done")

44
ids.py
View file

@ -16,7 +16,7 @@ import apns
import bags import bags
import gsa import gsa
#USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]" USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
# NOTE: The push token MUST be registered with the account for self-uri! # NOTE: The push token MUST be registered with the account for self-uri!
# This is an actual valid one for my account, since you can look it up anyway. # This is an actual valid one for my account, since you can look it up anyway.
#PUSH_TOKEN = "5V7AY+ikHr4DiSfq1W2UBa71G3FLGkpUSKTrOLg81yk=" #PUSH_TOKEN = "5V7AY+ikHr4DiSfq1W2UBa71G3FLGkpUSKTrOLg81yk="
@ -35,17 +35,6 @@ def generate_nonce() -> bytes:
+ random.randbytes(8) + random.randbytes(8)
) )
def load_keys() -> tuple[str, str]:
# Load the private key and certificate from files
with open("ids.key", "r") as f:
ids_key = f.read()
with open("ids.crt", "r") as f:
ids_cert = f.read()
return ids_key, ids_cert
def _create_payload( def _create_payload(
bag_key: str, bag_key: str,
query_string: str, query_string: str,
@ -92,27 +81,31 @@ def sign_payload(
# global_key, global_cert = load_keys() # global_key, global_cert = load_keys()
def _send_request(conn: apns.APNSConnection, bag_key: str, body: bytes) -> bytes: def _send_request(conn: apns.APNSConnection, bag_key: str, body: bytes, id_key: str, id_cert, username: str) -> bytes:
body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS) body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS)
push_token = b64encode(conn.token).decode()
# Sign the request # Sign the request
signature, nonce = sign_payload(global_key, bag_key, "", PUSH_TOKEN, body) signature, nonce = sign_payload(id_key, bag_key, "", push_token, body)
headers = { headers = {
"x-id-cert": global_cert.replace("-----BEGIN CERTIFICATE-----", "") "x-id-cert": id_cert.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "") .replace("-----END CERTIFICATE-----", "")
.replace("\n", ""), .replace("\n", ""),
"x-id-nonce": b64encode(nonce).decode(), "x-id-nonce": b64encode(nonce).decode(),
"x-id-sig": signature, "x-id-sig": signature,
"x-push-token": PUSH_TOKEN, "x-push-token": push_token,
"x-id-self-uri": SELF_URI, "x-id-self-uri": 'mailto:' + username,
"User-Agent": USER_AGENT, "User-Agent": USER_AGENT,
"x-protocol-version": "1630", "x-protocol-version": "1630",
} }
print(headers)
req = { req = {
"cT": "application/x-apple-plist", "cT": "application/x-apple-plist",
"U": b"\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes? "U": b"\x16%C\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes?
"c": 96, "c": 96,
"ua": USER_AGENT, "ua": USER_AGENT,
"u": bags.ids_bag()[bag_key], "u": bags.ids_bag()[bag_key],
@ -132,9 +125,9 @@ def _send_request(conn: apns.APNSConnection, bag_key: str, body: bytes) -> bytes
return resp_body return resp_body
def lookup(conn: apns.APNSConnection, query: list[str]) -> any: def lookup(conn: apns.APNSConnection, query: list[str], id_pair: tuple[str, str], self_user: str) -> any:
query = {"uris": query} query = {"uris": query}
resp = _send_request(conn, "id-query", plistlib.dumps(query)) resp = _send_request(conn, "id-query", plistlib.dumps(query), id_pair[0], id_pair[1], self_user)
resp = plistlib.loads(resp) resp = plistlib.loads(resp)
resp = zlib.decompress(resp["b"], 16 + zlib.MAX_WBITS) resp = zlib.decompress(resp["b"], 16 + zlib.MAX_WBITS)
resp = plistlib.loads(resp) resp = plistlib.loads(resp)
@ -307,14 +300,3 @@ def _register_request(
if "status" in r and r["status"] == 6004: if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!") raise Exception("Validation data expired!")
return r return r
def input_multiline(prompt):
print(prompt)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines)