pypush-plus-plus/demo.py
2023-07-26 20:23:45 -04:00

229 lines
6.4 KiB
Python

import gzip
import json
import logging
import plistlib
import threading
import time
from base64 import b64decode, b64encode
from getpass import getpass
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from rich.logging import RichHandler
import apns
import ids
from ids.keydec import IdentityKeys
logging.basicConfig(
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.DEBUG)
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
conn = apns.APNSConnection(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
)
def safe_b64decode(s):
try:
return b64decode(s)
except:
return None
conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token")))
conn.set_state(1)
conn.filter(["com.apple.madrid"])
user = ids.IDSUser(conn)
if CONFIG.get("auth", {}).get("cert") is not None:
auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"])
user_id = CONFIG["auth"]["user_id"]
handles = CONFIG["auth"]["handles"]
user.restore_authentication(auth_keypair, user_id, handles)
else:
username = input("Username: ")
password = getpass("Password: ")
user.authenticate(username, password)
user.ec_key = CONFIG.get("encryption", {}).get("ec_key")
user.rsa_key = CONFIG.get("encryption", {}).get("rsa_key")
if (
CONFIG.get("id", {}).get("cert") is not None
and user.ec_key is not None
and user.rsa_key is not None
):
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair)
else:
logging.info("Registering new identity...")
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
user.register(vd)
logging.info("Waiting for incoming messages...")
# Create a thread to send keepalive messages
def keepalive():
while True:
time.sleep(5)
conn.keep_alive()
threading.Thread(target=keepalive, daemon=True).start()
# Write config.json
CONFIG["encryption"] = {
"ec_key": user.ec_key,
"rsa_key": user.rsa_key,
}
CONFIG["id"] = {
"key": user._id_keypair.key,
"cert": user._id_keypair.cert,
}
CONFIG["auth"] = {
"key": user._auth_keypair.key,
"cert": user._auth_keypair.cert,
"user_id": user.user_id,
"handles": user.handles,
}
CONFIG["push"] = {
"token": b64encode(user.push_connection.token).decode(),
"key": user.push_connection.private_key,
"cert": user.push_connection.cert,
}
with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4)
user_rsa_key = load_pem_private_key(user.rsa_key.encode(), password=None)
NORMAL_NONCE = b"\x00" * 15 + b"\x01"
def decrypt(payload, sender_token, rsa_key: rsa.RSAPrivateKey = user_rsa_key):
"""
iMessage payload format:
0x00 - ?
0x01-0x02 - length of payload
0x03-0xA0 - RSA encrypted payload portion
0x00-0x0F - AES key
0x0F-0x?? - AES encrypted payload portion
0xA1-0xlength of payload+3 - AES encrypted payload portion
0xlength of payload+3 - length of signature
0xLEN+4-0xLEN+4+length of signature - signature
"""
from io import BytesIO
payload = BytesIO(payload)
tag = payload.read(1)
length = int.from_bytes(payload.read(2), "big")
body = payload.read(length)
body_io = BytesIO(body)
rsa_body = rsa_key.decrypt(
body_io.read(160),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
),
)
cipher = Cipher(algorithms.AES(rsa_body[:16]), modes.CTR(NORMAL_NONCE))
decrypted = cipher.decryptor().update(rsa_body[16:] + body_io.read())
# Try to gzip decompress the payload
try:
decrypted = gzip.decompress(decrypted)
except:
logging.debug("Failed to decompress payload")
pass
decrypted = plistlib.loads(decrypted)
signature_len = payload.read(1)[0]
signature = payload.read(signature_len)
#logging.info(f"Signature: {signature}")
#logging.info(f"Decrypted: {decrypted}")
# Verify the signature
sender = decrypted["p"][-1]
# Lookup the public key for the sender
lookup = user.lookup([sender])[sender]
#logging.debug(f"Lookup: {lookup}")
sender = None
for identity in lookup['identities']:
if identity['push-token'] == sender_token:
sender = identity
break
if sender is None:
logging.error(f"Failed to find identity for {sender_token}")
identity_keys = sender['client-data']['public-message-identity-key']
identity_keys = IdentityKeys.decode(identity_keys)
sender_ec_key = identity_keys.ecdsa_key
from cryptography.hazmat.primitives.asymmetric import ec
#logging.debug(f"Verifying signature {signature} with key {sender_ec_key.public_numbers()} and data {body}")
# Verify the signature (will throw an exception if it fails)
sender_ec_key.verify(
signature,
body,
ec.ECDSA(hashes.SHA1()),
)
return decrypted
while True:
def check_response(x):
if x[0] != 0x0A:
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
if "P" not in resp_body:
return False
return True
payload = conn.incoming_queue.wait_pop_find(check_response)
resp_body = apns._get_field(payload[1], 3)
id = apns._get_field(payload[1], 4)
conn._send_ack(id)
resp_body = plistlib.loads(resp_body)
# logging.info(f"Got response: {resp_body}")
logging.debug(f"Got message: {resp_body}")
token = resp_body['t']
payload = resp_body["P"]
payload = decrypt(payload, token)
logging.info(f"Got message: {payload['t']} from {payload['p'][1]}")