diff --git a/demo.py b/demo.py index 3bce2d1..3e0bfb0 100644 --- a/demo.py +++ b/demo.py @@ -93,6 +93,17 @@ else: user.authenticate(username, password) +# Generate a new RSA keypair for the identity +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import load_pem_private_key +# Load the old keypair if it exists +if CONFIG.get("encrypt") is not None: + priv_enc = load_pem_private_key(CONFIG["encrypt"].encode(), password=None) +else: + priv_enc = rsa.generate_private_key(public_exponent=65537, key_size=1280) +pub_enc = priv_enc.public_key() + if CONFIG.get("id", {}).get("cert") is not None: id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) user.restore_identity(id_keypair) @@ -101,32 +112,70 @@ else: import emulated.nac vd = emulated.nac.generate_validation_data() vd = b64encode(vd).decode() - user.register(vd) + + from ids.keydec import IdentityKeys + + published_keys = IdentityKeys(None, pub_enc) + user.register(vd, published_keys) #logging.info(f"Looked up textgpt@icloud.com, got response: {user.lookup(['mailto:textgpt@icloud.com'])}") -logging.info("Enter a username to look up, for example: mailto:textgpt@icloud.com") -while True: - # Read a line from stdin - line = input("> ") - if line == "": - break - # Look up the username - resp = user.lookup([line]) - #logging.info(f"Looked up {line}, got response: {user.lookup([line])}") - info = resp[line] - identities = info["identities"] - logging.info(f"Identities: {len(identities)}") - for identity in identities: - logging.info(f"Identity: [yellow]{b64encode(identity['push-token']).decode()}[/] ({len(identity)} properties)", extra={"markup": True}) - if len(identity) > 5: - logging.warning(identity) +# logging.info("Enter a username to look up, for example: mailto:textgpt@icloud.com") +# while True: +# # Read a line from stdin +# line = input("> ") +# if line == "": +# break +# # Look up the username +# resp = user.lookup([line]) +# #logging.info(f"Looked up {line}, got response: {user.lookup([line])}") +# info = resp[line] +# identities = info["identities"] +# logging.info(f"Identities: {len(identities)}") +# for identity in identities: +# logging.info(f"Identity: [yellow]{b64encode(identity['push-token']).decode()}[/] ({len(identity)} properties)", extra={"markup": True}) +# if len(identity) > 5: +# logging.warning(identity) + +logging.debug(user.lookup(["mailto:usert4@icloud.com", "mailto:jjtech@jjtech.dev"])) +# resp = user.lookup(["mailto:jjtech@jjtech.dev"]) +# info = resp["mailto:jjtech@jjtech.dev"] +# identities = info["identities"] +# for identity in identities: +# logging.info(f"Identity: [yellow]{b64encode(identity['push-token']).decode()}[/] ({len(identity)} properties)", extra={"markup": True}) +# if "client-data" in identity: +# logging.warning(identity["client-data"]) +logging.info("Waiting for incomming messages...") + +# Create a thread to send keepalive messages +import threading +import time +def keepalive(): + while True: + time.sleep(5) + conn.keep_alive() +threading.Thread(target=keepalive, daemon=True).start() + +# while True: +# # # Wait for a message +# # # def check_response(x): +# # # if x[0] != 0x0A: +# # # return False +# # # resp_body = apns._get_field(x[1], 3) +# # # if resp_body is None: +# # # return False +# # # resp_body = apns._get_field(x[1], 3) +# # # if resp_body is None: +# # # return False +# # # resp_body = plistlib.loads(resp_body) +# # # return resp_body.get('U') == msg_id +# pass # Write config.json -#CONFIG["id"] = { -# "key": user._id_keypair.key, -# "cert": user._id_keypair.cert, -#} +CONFIG["id"] = { + "key": user._id_keypair.key, + "cert": user._id_keypair.cert, +} CONFIG["auth"] = { "key": user._auth_keypair.key, "cert": user._auth_keypair.cert, @@ -138,6 +187,54 @@ CONFIG["push"] = { "key": user.push_connection.private_key, "cert": user.push_connection.cert, } +from cryptography.hazmat.primitives import serialization +CONFIG["encrypt"] = priv_enc.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()).decode("utf-8").strip() with open("config.json", "w") as f: json.dump(CONFIG, f, indent=4) + +def decrypt(payload): + import gzip + #print(payload[1:3]) + length = int.from_bytes(payload[1:3], "big") + #print("Length", length) + payload = payload[3:length+3] + #print("Decrypting payload", payload) + from cryptography.hazmat.primitives.asymmetric import padding + from cryptography.hazmat.primitives import hashes + decrypted1 = priv_enc.decrypt(payload[:160], padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None + )) + + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + cipher = Cipher(algorithms.AES(decrypted1[:16]), modes.CTR(b'\x00'*15 + b'\x01')) + decryptor = cipher.decryptor() + pt = decryptor.update(decrypted1[16:] + payload[160:]) + #print(pt) + pt = gzip.decompress(pt) + payload = plistlib.loads(pt) + logging.info(f"Got payload: {payload}") + + +import plistlib +while True: + def check_response(x): + if x[0] != 0x0A: + return False + resp_body = apns._get_field(x[1], 3) + if resp_body is None: + return False + resp_body = plistlib.loads(resp_body) + if "P" not in resp_body: + return False + return True + payload = conn.incoming_queue.wait_pop_find(check_response) + resp_body = apns._get_field(payload[1], 3) + resp_body = plistlib.loads(resp_body) + #logging.info(f"Got response: {resp_body}") + payload = resp_body["P"] + decrypt(payload) + + \ No newline at end of file diff --git a/ids/__init__.py b/ids/__init__.py index 5f72816..5099895 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -2,7 +2,7 @@ from base64 import b64encode import apns -from . import _helpers, identity, profile, query +from . import _helpers, identity, profile, query, keydec class IDSUser: @@ -53,13 +53,14 @@ class IDSUser: self.handles = handles # This is a separate call so that the user can make sure the first part succeeds before asking for validation data - def register(self, validation_data: str): + def register(self, validation_data: str, published_keys: keydec.IdentityKeys): cert = identity.register( b64encode(self.push_connection.token), self.handles, self.user_id, self._auth_keypair, self._push_keypair, + published_keys, validation_data, ) self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert) diff --git a/ids/identity.py b/ids/identity.py index 9d79963..c90f943 100644 --- a/ids/identity.py +++ b/ids/identity.py @@ -5,13 +5,14 @@ import requests from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair from .signing import add_auth_signature, armour_cert +from .keydec import IdentityKeys import logging logger = logging.getLogger("ids") def register( - push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, validation_data + push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, published_keys: IdentityKeys, validation_data ): logger.debug(f"Registering IDS identity for {handles}") uris = [{"uri": handle} for handle in handles] @@ -30,39 +31,39 @@ def register( "client-data": { 'is-c2k-equipment': True, 'optionally-receive-typing-indicators': True, - 'public-message-identity-key': b64decode("""MIH2gUMAQQSYmvE+hYOWVGotZUCd - M6zoW/2clK8RIzUtE6JAmWSCwj7d - B213vxEBNAPHefEtlxkVKlQH6bsw - ja5qYyl3Fh28goGuAKwwgakCgaEA - 4lw3MrXOFIWWIi3TTUGksXVCIz92 - R3AG3ghBa1ZBoZ6rIJHeuxhD2vTV - hicpW7kvZ/+AFgE4vFFef/9TjG6C - rsBtWUUfPtYHqc7+uaghVW13qfYC - tdGsW8Apvf6MJqsRmITJjoYZ5kwl - scp5Xw/1KVQzKMfZrwZeLC/UZ6O1 - 41u4Xvm+u40e+Ky/wMCOwLGBG0Ag - ZBH91Xrq+S8izgSLmQIDAQAB"""), - - 'public-message-identity-version':2, - 'show-peer-errors': True, - 'supports-ack-v1': True, - 'supports-activity-sharing-v1': True, - 'supports-audio-messaging-v2': True, - "supports-autoloopvideo-v1": True, - 'supports-be-v1': True, - 'supports-ca-v1': True, - 'supports-fsm-v1': True, - 'supports-fsm-v2': True, - 'supports-fsm-v3': True, - 'supports-ii-v1': True, - 'supports-impact-v1': True, - 'supports-inline-attachments': True, - 'supports-keep-receipts': True, - "supports-location-sharing": True, - 'supports-media-v2': True, - 'supports-photos-extension-v1': True, - 'supports-st-v1': True, - 'supports-update-attachments-v1': True, + 'public-message-identity-key': published_keys.encode(), + # 'public-message-identity-key': b64decode("""MIH2gUMAQQSYmvE+hYOWVGotZUCd + # M6zoW/2clK8RIzUtE6JAmWSCwj7d + # B213vxEBNAPHefEtlxkVKlQH6bsw + # ja5qYyl3Fh28goGuAKwwgakCgaEA + # 4lw3MrXOFIWWIi3TTUGksXVCIz92 + # R3AG3ghBa1ZBoZ6rIJHeuxhD2vTV + # hicpW7kvZ/+AFgE4vFFef/9TjG6C + # rsBtWUUfPtYHqc7+uaghVW13qfYC + # tdGsW8Apvf6MJqsRmITJjoYZ5kwl + # scp5Xw/1KVQzKMfZrwZeLC/UZ6O1 + # 41u4Xvm+u40e+Ky/wMCOwLGBG0Ag + # ZBH91Xrq+S8izgSLmQIDAQAB""".replace("\n", "").replace(" ", "").replace("\t", "")), + 'public-message-identity-version':2, + 'show-peer-errors': True, + 'supports-ack-v1': True, + 'supports-activity-sharing-v1': True, + 'supports-audio-messaging-v2': True, + "supports-autoloopvideo-v1": True, + 'supports-be-v1': True, + 'supports-ca-v1': True, + 'supports-fsm-v1': True, + 'supports-fsm-v2': True, + 'supports-fsm-v3': True, + 'supports-ii-v1': True, + 'supports-impact-v1': True, + 'supports-inline-attachments': True, + 'supports-keep-receipts': True, + "supports-location-sharing": True, + 'supports-media-v2': True, + 'supports-photos-extension-v1': True, + 'supports-st-v1': True, + 'supports-update-attachments-v1': True, }, "uris": uris, "user-id": user_id, diff --git a/ids/keydec.py b/ids/keydec.py new file mode 100644 index 0000000..f8d8e94 --- /dev/null +++ b/ids/keydec.py @@ -0,0 +1,74 @@ +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import rsa + +from base64 import b64decode, b64encode + +from io import BytesIO + +class IdentityKeys(): + def __init__(self, ecdsa_key: ec.EllipticCurvePublicKey, rsa_key: rsa.RSAPublicKey): + self.ecdsa_key = ecdsa_key + self.rsa_key = rsa_key + + def decode(input: bytes) -> 'IdentityKeys': + input = BytesIO(input) + + assert input.read(5) == b'\x30\x81\xF6\x81\x43' # DER header + raw_ecdsa = input.read(67) + print(b64encode(raw_ecdsa).decode()) + assert input.read(3) == b'\x82\x81\xAE' # DER header + raw_rsa = input.read(174) + + # Parse the RSA key + raw_rsa = BytesIO(raw_rsa) + assert raw_rsa.read(2) == b'\x00\xAC' # Not sure what this is + assert raw_rsa.read(3) == b'\x30\x81\xA9' # Inner DER header + assert raw_rsa.read(3) == b'\x02\x81\xA1' + rsa_modulus = raw_rsa.read(161) + rsa_modulus = int.from_bytes(rsa_modulus, "big") + assert raw_rsa.read(5) == b'\x02\x03\x01\x00\x01' # Exponent, should always be 65537 + + # TODO: Parse the ECDSA key + + # Construct a public key + rsa_key = rsa.RSAPublicNumbers(e=65537, n=rsa_modulus) + rsa_key = rsa_key.public_key() + + return IdentityKeys(None, rsa_key) + + def encode(self) -> bytes: + output = BytesIO() + + raw_rsa = BytesIO() + raw_rsa.write(b'\x00\xAC') + raw_rsa.write(b'\x30\x81\xA9') + raw_rsa.write(b'\x02\x81\xA1') + raw_rsa.write(self.rsa_key.public_numbers().n.to_bytes(161, "big")) + raw_rsa.write(b'\x02\x03\x01\x00\x01') + + output.write(b'\x30\x81\xF6\x81\x43') + output.write(b64decode("AEEEmJrxPoWDllRqLWVAnTOs6Fv9nJSvESM1LROiQJlkgsI+3Qdtd78RATQDx3nxLZcZFSpUB+m7MI2uamMpdxYdvA==")) + output.write(b'\x82\x81\xAE') + output.write(raw_rsa.getvalue()) + + return output.getvalue() + +if __name__ == "__main__": + input_key = """MIH2gUMAQQSYmvE+hYOWVGotZUCd + M6zoW/2clK8RIzUtE6JAmWSCwj7d + B213vxEBNAPHefEtlxkVKlQH6bsw + ja5qYyl3Fh28goGuAKwwgakCgaEA + 4lw3MrXOFIWWIi3TTUGksXVCIz92 + R3AG3ghBa1ZBoZ6rIJHeuxhD2vTV + hicpW7kvZ/+AFgE4vFFef/9TjG6C + rsBtWUUfPtYHqc7+uaghVW13qfYC + tdGsW8Apvf6MJqsRmITJjoYZ5kwl + scp5Xw/1KVQzKMfZrwZeLC/UZ6O1 + 41u4Xvm+u40e+Ky/wMCOwLGBG0Ag + ZBH91Xrq+S8izgSLmQIDAQAB""".replace("\n", "").replace(" ", "").replace("\t", "") + keys = IdentityKeys.decode(b64decode(input_key)) + print(b64encode(keys.encode()).decode()) + print(len(keys.encode())) + print(len(b64decode(input_key))) + print(keys.encode() == b64decode(input_key)) + print(keys.rsa_key.key_size) \ No newline at end of file