mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-23 11:22:42 -06:00
134 lines
3.8 KiB
Python
134 lines
3.8 KiB
Python
import plistlib
|
|
import random
|
|
import zlib
|
|
from base64 import b64decode, b64encode
|
|
from datetime import datetime
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
import apns
|
|
|
|
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
|
|
# NOTE: The push token MUST be registered with the account for self-uri!
|
|
# This is an actual valid one for my account, since you can look it up anyway.
|
|
PUSH_TOKEN = "5V7AY+ikHr4DiSfq1W2UBa71G3FLGkpUSKTrOLg81yk="
|
|
SELF_URI = "mailto:jjtech@jjtech.dev"
|
|
|
|
|
|
# Nonce Format:
|
|
# 01000001876bd0a2c0e571093967fce3d7
|
|
# 01 # version
|
|
# 000001876d008cc5 # unix time
|
|
# r1r2r3r4r5r6r7r8 # random bytes
|
|
def generate_nonce() -> bytes:
|
|
return (
|
|
b"\x01"
|
|
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
|
|
+ random.randbytes(8)
|
|
)
|
|
|
|
|
|
def load_keys() -> tuple[str, str]:
|
|
# Load the private key and certificate from files
|
|
with open("ids.key", "r") as f:
|
|
ids_key = f.read()
|
|
with open("ids.crt", "r") as f:
|
|
ids_cert = f.read()
|
|
|
|
return ids_key, ids_cert
|
|
|
|
|
|
def _create_payload(
|
|
bag_key: str, query_string: str, push_token: str, payload: bytes
|
|
) -> tuple[str, bytes]:
|
|
# Generate the nonce
|
|
nonce = generate_nonce()
|
|
push_token = b64decode(push_token)
|
|
|
|
return (
|
|
nonce
|
|
+ len(bag_key).to_bytes(4)
|
|
+ bag_key.encode()
|
|
+ len(query_string).to_bytes(4)
|
|
+ query_string.encode()
|
|
+ len(payload).to_bytes(4)
|
|
+ payload
|
|
+ len(push_token).to_bytes(4)
|
|
+ push_token,
|
|
nonce,
|
|
)
|
|
|
|
|
|
def sign_payload(
|
|
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
|
|
) -> tuple[str, bytes]:
|
|
# Load the private key
|
|
key = serialization.load_pem_private_key(
|
|
private_key.encode(), password=None, backend=default_backend()
|
|
)
|
|
|
|
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
|
|
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
|
|
|
|
sig = b"\x01\x01" + sig
|
|
sig = b64encode(sig).decode()
|
|
|
|
return sig, nonce
|
|
|
|
|
|
BAG_KEYS = {
|
|
"id-query": "https://query.ess.apple.com/WebObjects/QueryService.woa/wa/query"
|
|
}
|
|
|
|
global_key, global_cert = load_keys()
|
|
|
|
|
|
def _send_request(conn: apns.APNSConnection, type: str, body: bytes) -> bytes:
|
|
body = zlib.compress(body, wbits=16 + zlib.MAX_WBITS)
|
|
|
|
# Sign the request
|
|
signature, nonce = sign_payload(global_key, type, "", PUSH_TOKEN, body)
|
|
|
|
headers = {
|
|
"x-id-cert": global_cert.replace("-----BEGIN CERTIFICATE-----", "")
|
|
.replace("-----END CERTIFICATE-----", "")
|
|
.replace("\n", ""),
|
|
"x-id-nonce": b64encode(nonce).decode(),
|
|
"x-id-sig": signature,
|
|
"x-push-token": PUSH_TOKEN,
|
|
"x-id-self-uri": SELF_URI,
|
|
"User-Agent": USER_AGENT,
|
|
"x-protocol-version": "1630",
|
|
}
|
|
|
|
req = {
|
|
"cT": "application/x-apple-plist",
|
|
"U": b"\x16%D\xd5\xcd:D1\xa1\xa7z6\xa9\xe2\xbc\x8f", # Just random bytes?
|
|
"c": 96,
|
|
"ua": USER_AGENT,
|
|
"u": BAG_KEYS[type],
|
|
"h": headers,
|
|
"v": 2,
|
|
"b": body,
|
|
}
|
|
|
|
conn.send_message("com.apple.madrid", plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
|
|
resp = conn.wait_for_packet(0x0A)
|
|
|
|
resp_body = apns._get_field(resp[1], 3)
|
|
|
|
if resp_body is None:
|
|
raise (Exception(f"Got invalid response: {resp}"))
|
|
|
|
return resp_body
|
|
|
|
|
|
def lookup(conn: apns.APNSConnection, query: list[str]) -> any:
|
|
query = {"uris": query}
|
|
resp = _send_request(conn, "id-query", plistlib.dumps(query))
|
|
resp = plistlib.loads(resp)
|
|
resp = zlib.decompress(resp["b"], 16 + zlib.MAX_WBITS)
|
|
resp = plistlib.loads(resp)
|
|
return resp
|