diff --git a/demo.py b/demo.py index aba0901..2699b50 100644 --- a/demo.py +++ b/demo.py @@ -6,6 +6,7 @@ import time from base64 import b64decode, b64encode from getpass import getpass from cryptography import x509 +import datetime from rich.logging import RichHandler @@ -30,7 +31,7 @@ logging.getLogger("jelly").setLevel(logging.INFO) logging.getLogger("nac").setLevel(logging.INFO) logging.getLogger("apns").setLevel(logging.INFO) logging.getLogger("albert").setLevel(logging.INFO) -logging.getLogger("ids").setLevel(logging.DEBUG) +logging.getLogger("ids").setLevel(logging.INFO) logging.getLogger("bags").setLevel(logging.INFO) logging.getLogger("imessage").setLevel(logging.INFO) @@ -60,7 +61,12 @@ def get_not_valid_after_timestamp(cert_data): except Exception as e: return None # Return None in case of an error +expiration = None + async def main(args: argparse.Namespace): + + global expiration + # Load any existing push credentials token = CONFIG.get("push", {}).get("token") token = b64decode(token) if token is not None else b"" @@ -74,6 +80,59 @@ async def main(args: argparse.Namespace): vd = b64encode(vd).decode() users = ids.register(conn, users, vd, args.client_data or args.reg_notify) return users + + async def reregister(conn: apns.APNSConnection, users: list[ids.IDSUser]) -> datetime.datetime: + register(conn, users) + + CONFIG["users"] = [] + + expiration = None + # Format time as HH:MM:SS PM/AM EST/EDT (X minutes from now) + expire_msg = lambda expiration: f"Number registration is valid until {str(expiration.astimezone().strftime('%I:%M:%S %p %Z'))} ({str(int((expiration - datetime.datetime.now(datetime.timezone.utc)).total_seconds()/60))} minutes from now)" + email_user = None + email_addr = None # For HACK below + + for user in users: + # Clear the config and re-save everything to match the new registration + CONFIG["users"].append({ + "id": user.user_id, + "auth_key": user.auth_keypair.key, + "auth_cert": user.auth_keypair.cert, + "encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None, + "signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None, + "id_cert": user.id_cert, + "handles": user.handles, + }) + + # If this is a phone number user, then it's got the be the one we just linked + # so pull out the expiration date from the certificate + if "P:" in str(user.user_id): + # There is not really a good reason to try/catch here: If we couldn't reregister, just crash (very unlikely we can recover) + cert = x509.load_pem_x509_certificate(user.id_cert.encode('utf-8')) + expiration = cert.not_valid_after + # Make it a UTC aware timezone, for reasons + expiration = expiration.replace(tzinfo=datetime.timezone.utc) + logging.info(expire_msg(expiration)) + + # This is an Apple ID user, so we can use it to send the notification iMessage (if enabled) + if "U:" in str(user.user_id): + email_user = user + for n in range(len(user.handles)): + # HACK: Just pick the first email address they have to avoid picking the linked phone number + # TODO: Properly fix this, so that the linked phone number is not in the Apple ID user's list of handles + if "mailto:" in str(user.handles[n]): + email_addr = user.handles[n] + + # Save the config to disk + safe_config() + + # Send the notification iMessage (if enabled) + if args.reg_notify: + im = imessage.iMessageUser(conn, email_user) + im.current_handle = email_addr # HACK: See above + await im.send(imessage.iMessage.create(im, expire_msg(expiration), [email_addr])) + + return expiration async with apns.APNSConnection.start(push_creds) as conn: # Save the push credentials to the config @@ -110,19 +169,25 @@ async def main(args: argparse.Namespace): if input("> ").lower() == "y": import sms_registration - if args.phone is None: - raise GatewayConnectionError("You did not supply an IP address.") - if "phone" in CONFIG: phone_sig = b64decode(CONFIG["phone"].get("sig")) phone_number = CONFIG["phone"].get("number") elif args.pdu is not None: phone_number, phone_sig = sms_registration.parse_pdu(args.pdu, None) else: + if args.phone is None: + #raise GatewayConnectionError("You did not supply an IP address.") + # Prompt for IP address + print("Please enter the IP address of your phone.") + print("This should be displayed in the SMS registration helper app") + print("You must be on the same network as your phone.") + phone = input("> ") + else: + phone = args.phone import sms_registration phone_number, phone_sig = sms_registration.register(push_token=conn.credentials.token, no_parse=args.trigger_pdu, gateway=args.gateway, - phone_ip=args.phone) + phone_ip=phone) CONFIG["phone"] = { "number": phone_number, "sig": b64encode(phone_sig).decode(), @@ -138,52 +203,27 @@ async def main(args: argparse.Namespace): users.append(ids.IDSAppleUser.authenticate(conn, username, password)) - users = register(conn, users) + await reregister(conn, users) - CONFIG["users"] = [] - for user in users: - CONFIG["users"].append({ - "id": user.user_id, - "auth_key": user.auth_keypair.key, - "auth_cert": user.auth_keypair.cert, - "encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None, - "signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None, - "id_cert": user.id_cert, - "handles": user.handles, - }) - safe_config() + if args.daemon: + wait_time_minutes = 5 # this is in minutes. 5 recommended + + expiration = await reregister(conn, users) + + while True: + reregister_time = expiration - datetime.timedelta(minutes=wait_time_minutes) # wait_time_minutes before expiration + reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds() + + logging.info(f"Reregistering in {int(reregister_delta / 60)} minutes...") + await trio.sleep(reregister_delta) + + logging.info("Reregistering...") + register(conn, users) + + logging.info("Reregistered!") if args.reregister: - print("Re-registering...") - register(conn, users) - - CONFIG["users"] = [] - for user in users: - CONFIG["users"].append({ - "id": user.user_id, - "auth_key": user.auth_keypair.key, - "auth_cert": user.auth_keypair.cert, - "encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None, - "signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None, - "id_cert": user.id_cert, - "handles": user.handles, - }) - - if "P:" in str(user.user_id): - expiration = get_not_valid_after_timestamp(user.id_cert) - expiration = str(expiration) + " UTC" - print(f"Number registration is valid until {expiration}. (YYYY/MM/DD)") - else: - email_user = user - for n in range(len(user.handles)): - if "mailto:" in str(user.handles[n]): - email_addr = user.handles[n] - - safe_config() - if args.reg_notify: - im = imessage.iMessageUser(conn, email_user) - im.current_handle = email_addr - await im.send(imessage.iMessage.create(im, "Number registration is valid until " + expiration, [email_addr])) + await reregister(conn, users) print("Done!") @@ -192,105 +232,6 @@ async def main(args: argparse.Namespace): while True: await trio.sleep(20) - # im = imessage.iMessageUser(conn, users[0]) - # await im.send(imessage.iMessage.create(im, "Hello world!", ["tel:+16106632676", "mailto:testu3@icloud.com"])) - - # while True: - # print(await im.receive()) - - # user = ids.IDSUser(conn) - - # if CONFIG.get("auth", {}).get("cert") is not None: - # auth_keypair = ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]) - # user_id = CONFIG["auth"]["user_id"] - # handles = CONFIG["auth"]["handles"] - # user.restore_authentication(auth_keypair, user_id, handles) - # else: - # username = input("Username: ") - # password = getpass("Password: ") - - # user.authenticate(username, password) - - # import sms_registration - # phone_sig = safe_b64decode(CONFIG.get("phone", {}).get("sig")) - # phone_number = CONFIG.get("phone", {}).get("number") - - # if phone_sig is None or phone_number is None: - # print("Registering phone number...") - # phone_number, phone_sig = sms_registration.register(user.push_connection.credentials.token) - # CONFIG["phone"] = { - # "number": phone_number, - # "sig": b64encode(phone_sig).decode(), - # } - # if CONFIG.get("phone", {}).get("auth_key") is not None and CONFIG.get("phone", {}).get("auth_cert") is not None: - # phone_auth_keypair = ids._helpers.KeyPair(CONFIG["phone"]["auth_key"], CONFIG["phone"]["auth_cert"]) - # else: - # phone_auth_keypair = ids.profile.get_phone_cert(phone_number, user.push_connection.credentials.token, [phone_sig]) - # CONFIG["phone"]["auth_key"] = phone_auth_keypair.key - # CONFIG["phone"]["auth_cert"] = phone_auth_keypair.cert - - - # user.encryption_identity = ids.identity.IDSIdentity( - # encryption_key=CONFIG.get("encryption", {}).get("rsa_key"), - # signing_key=CONFIG.get("encryption", {}).get("ec_key"), - # ) - - # #user._auth_keypair = phone_auth_keypair - # user.handles = [f"tel:{phone_number}"] - # print(user.user_id) - # # user.user_id = f"P:{phone_number}" - - - # if ( - # CONFIG.get("id", {}).get("cert") is not None - # and user.encryption_identity is not None - # ): - # id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) - # user.restore_identity(id_keypair) - # else: - # logging.info("Registering new identity...") - # import emulated.nac - - # vd = emulated.nac.generate_validation_data() - # vd = b64encode(vd).decode() - - # ids.register - # user.register(vd, [("P:" + phone_number, phone_auth_keypair)]) - # #user.register(vd) - - # print("Handles: ", user.handles) - - # # Write config.json - # CONFIG["encryption"] = { - # "rsa_key": user.encryption_identity.encryption_key, - # "ec_key": user.encryption_identity.signing_key, - # } - # CONFIG["id"] = { - # "key": user._id_keypair.key, - # "cert": user._id_keypair.cert, - # } - # CONFIG["auth"] = { - # "key": user._auth_keypair.key, - # "cert": user._auth_keypair.cert, - # "user_id": user.user_id, - # "handles": user.handles, - # } - # CONFIG["push"] = { - # "token": b64encode(user.push_connection.credentials.token).decode(), - # "key": user.push_connection.credentials.private_key, - # "cert": user.push_connection.credentials.cert, - # } - - # with open("config.json", "w") as f: - # json.dump(CONFIG, f, indent=4) - - # im = imessage.iMessageUser(conn, user) - - # Send a message to myself - # async with trio.open_nursery() as nursery: - # nursery.start_soon(input_task, im) - # nursery.start_soon(output_task, im) - async def input_task(im: imessage.iMessageUser): while True: cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True) @@ -314,6 +255,7 @@ if __name__ == "__main__": parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP") parser.add_argument("--phone", type=str, help="Override the phone IP") parser.add_argument("--gateway", type=str, help="Override the gateway phone number") + parser.add_argument("--daemon", action="store_true", help="Continuously reregister 5 minutes before the certificate expires") args = parser.parse_args() diff --git a/ids/__init__.py b/ids/__init__.py index 52a5fdb..5871b4c 100644 --- a/ids/__init__.py +++ b/ids/__init__.py @@ -25,7 +25,7 @@ class IDSUser: encryption_identity: identity.IDSIdentity | None = None - id_cert: bytes | None = None + id_cert: str | None = None """ Short-lived identity certificate, same private key as auth_keypair diff --git a/ids/identity.py b/ids/identity.py index 31190c9..8794a70 100644 --- a/ids/identity.py +++ b/ids/identity.py @@ -147,7 +147,7 @@ def register( } - logger.warning(f"Sending IDS registration request: {body}") + logger.debug(f"Sending IDS registration request: {body}") body = plistlib.dumps(body)