pypush-plus-plus/demo.py

336 lines
14 KiB
Python
Raw Normal View History

2023-05-02 07:39:11 -05:00
import json
2023-07-26 06:50:48 -05:00
import logging
2023-12-02 14:09:31 -06:00
import argparse
import datetime
2023-07-26 06:50:48 -05:00
from base64 import b64decode, b64encode
2023-05-09 16:03:27 -05:00
from getpass import getpass
from cryptography import x509
2023-05-02 07:39:11 -05:00
2023-07-23 17:55:13 -05:00
from rich.logging import RichHandler
2023-12-02 14:09:31 -06:00
import trio
2023-07-26 06:50:48 -05:00
import apns
import ids
2023-07-28 10:53:13 -05:00
import imessage
2023-07-26 06:50:48 -05:00
from exceptions import *
2023-07-23 17:55:13 -05:00
logging.basicConfig(
2023-07-26 17:49:41 -05:00
level=logging.NOTSET, format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
2023-07-23 17:55:13 -05:00
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
2023-08-15 08:05:08 -05:00
logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from urllib3
2023-07-31 12:25:19 -05:00
logging.getLogger("asyncio").setLevel(logging.WARNING)
2023-07-23 17:55:13 -05:00
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
2023-08-17 21:05:18 -05:00
logging.getLogger("apns").setLevel(logging.INFO)
2023-07-24 08:18:21 -05:00
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.INFO)
2023-07-31 12:08:57 -05:00
logging.getLogger("bags").setLevel(logging.INFO)
2023-08-17 21:05:18 -05:00
logging.getLogger("imessage").setLevel(logging.INFO)
2023-05-02 19:53:18 -05:00
logging.captureWarnings(True)
2023-05-09 16:03:27 -05:00
# Try and load config.json
try:
with open("config.json", "r") as f:
CONFIG = json.load(f)
except FileNotFoundError:
CONFIG = {}
2023-05-02 07:39:11 -05:00
2023-12-02 14:09:31 -06:00
2023-05-09 17:48:44 -05:00
def safe_b64decode(s):
try:
return b64decode(s)
2023-12-02 14:09:31 -06:00
except (ValueError, TypeError) as e:
print(f"Error decoding b64: {e}")
2023-05-09 17:48:44 -05:00
return None
2023-12-02 14:09:31 -06:00
2023-08-25 21:26:37 -05:00
def safe_config():
2023-12-02 14:09:31 -06:00
with open("config.json", "w", encoding="utf-8") as f:
2023-08-25 21:26:37 -05:00
json.dump(CONFIG, f, indent=4)
2023-07-26 06:50:48 -05:00
2023-12-02 14:09:31 -06:00
def get_not_valid_after_timestamp(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8'))
return cert.not_valid_after
2023-12-02 14:09:31 -06:00
except (ValueError, TypeError, AttributeError) as e:
print(f"Error determining certificate expiration date: {e}")
return None # Return None in case of an error
2023-12-02 14:09:31 -06:00
expiration = None
def expire_msg(expiration):
now = datetime.datetime.now(datetime.timezone.utc)
remaining_seconds = int((expiration - now).total_seconds())
if remaining_seconds <= 60:
time_unit = "seconds"
time_value = remaining_seconds
elif remaining_seconds <= 3600:
time_unit = "minutes"
time_value = int(remaining_seconds / 60)
elif remaining_seconds <= 86400:
time_unit = "hours"
time_value = int(remaining_seconds / 3600)
else:
time_unit = "days"
time_value = float(remaining_seconds / 86400)
expire_message = f"Number registration is valid until {str(expiration.astimezone().strftime('%x %I:%M:%S %p %Z'))} ({time_value:.2f} {time_unit} from now)"
return expire_message
2023-12-02 14:09:31 -06:00
async def main(args: argparse.Namespace):
global expiration
2023-08-25 21:26:37 -05:00
# Load any existing push credentials
token = CONFIG.get("push", {}).get("token")
2023-08-25 21:26:37 -05:00
token = b64decode(token) if token is not None else b""
2023-08-17 19:23:56 -05:00
push_creds = apns.PushCredentials(
CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token)
2023-12-02 14:09:31 -06:00
2023-08-28 07:01:41 -05:00
def register(conn, users):
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
users = ids.register(conn, users, vd, args.client_data or args.reg_notify)
2023-08-28 07:01:41 -05:00
return users
2023-12-02 14:09:31 -06:00
def expiration_identifier(users: list[ids.IDSUser]) -> datetime.datetime | None:
2023-12-02 14:09:31 -06:00
expiration = None
2023-12-02 14:09:31 -06:00
for user in users:
# If this is a phone number user, then it's got to be the one we just linked
# so pull out the expiration date from the certificate
if "P:" in str(user.user_id):
# There is not really a good reason to try/catch here: If we couldn't reregister,
# just crash (very unlikely we can recover)
cert = x509.load_pem_x509_certificate(user.id_cert.encode('utf-8'))
expiration = cert.not_valid_after
# Make it a UTC aware timezone, for reasons
expiration = expiration.replace(tzinfo=datetime.timezone.utc)
logging.info(expire_msg(expiration))
return expiration
2023-11-16 15:55:08 -06:00
async def reregister(conn: apns.APNSConnection, users: list[ids.IDSUser]) -> datetime.datetime:
register(conn, users)
CONFIG["users"] = []
expiration = None
email_user = None
2023-12-02 14:09:31 -06:00
email_addr = None # For HACK below
for user in users:
# Clear the config and re-save everything to match the new registration
CONFIG["users"].append({
"id": user.user_id,
"auth_key": user.auth_keypair.key,
"auth_cert": user.auth_keypair.cert,
"encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None,
"signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None,
"id_cert": user.id_cert,
"handles": user.handles,
})
2023-11-16 15:55:08 -06:00
if not "P:" in str(user.user_id):
email_user = user
for n in range(len(user.handles)):
# HACK: Just pick the first email address they have to avoid picking the linked phone number
# TODO: Properly fix this, so that the linked phone number is not in the Apple ID user's list of handles
if "mailto:" in str(user.handles[n]):
email_addr = user.handles[n]
2023-11-19 11:12:17 -06:00
# Set up a temporary iMessage user to send notifications
im = imessage.iMessageUser(conn, email_user)
2023-12-02 14:09:31 -06:00
im.current_handle = email_addr # HACK: See above
2023-11-19 11:12:17 -06:00
# Notify other devices on the account that new handles are available
await im._send_raw(130, [im.current_handle], "com.apple.madrid")
2023-11-16 15:55:08 -06:00
expiration = expiration_identifier(users)
# Save the config to disk
safe_config()
# Send the notification iMessage (if enabled)
if args.reg_notify:
await im.send(imessage.iMessage.create(im, expire_msg(expiration), [email_addr]))
return expiration
2023-08-17 19:23:56 -05:00
async with apns.APNSConnection.start(push_creds) as conn:
2023-08-25 21:26:37 -05:00
# Save the push credentials to the config
CONFIG["push"] = {
"token": b64encode(conn.credentials.token).decode(),
"key": conn.credentials.private_key,
"cert": conn.credentials.cert,
}
safe_config()
# Activate the connection
await conn.set_state(1)
await conn.filter(["com.apple.madrid"])
2023-08-17 19:23:56 -05:00
2023-08-25 21:26:37 -05:00
# If the user wants a phone number, we need to register it WITH an Apple ID, then register the Apple ID again
# otherwise we encounter issues for some reason
2023-12-02 14:09:31 -06:00
2023-08-25 21:26:37 -05:00
users = []
if "id" in CONFIG:
logging.debug("Restoring old-style identity...")
2023-12-02 14:09:31 -06:00
users.append(ids.IDSAppleUser(conn, CONFIG["auth"]["user_id"],
ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]),
ids.identity.IDSIdentity(CONFIG["encryption"]["ec_key"],
CONFIG["encryption"]["rsa_key"]),
CONFIG["id"]["cert"],
CONFIG["auth"]["handles"]))
2023-08-25 21:26:37 -05:00
if "users" in CONFIG:
logging.debug("Restoring new-style identity...")
for user in CONFIG["users"]:
users.append(ids.IDSUser(conn, user["id"], ids._helpers.KeyPair(user["auth_key"], user["auth_cert"]),
2023-12-02 14:09:31 -06:00
ids.identity.IDSIdentity(user["signing_key"], user["encryption_key"]),
user["id_cert"],
user["handles"]))
2023-08-24 06:31:11 -05:00
else:
2023-08-25 21:26:37 -05:00
print("Would you like to register a phone number? (y/n)")
if input("> ").lower() == "y":
2023-09-02 18:09:41 -05:00
import sms_registration
2023-08-25 21:26:37 -05:00
if "phone" in CONFIG:
phone_sig = b64decode(CONFIG["phone"].get("sig"))
phone_number = CONFIG["phone"].get("number")
2023-09-02 18:09:41 -05:00
elif args.pdu is not None:
phone_number, phone_sig = sms_registration.parse_pdu(args.pdu, None)
2023-08-25 21:26:37 -05:00
else:
if args.phone is None:
2023-12-02 14:09:31 -06:00
# raise GatewayConnectionError("You did not supply an IP address.")
# Prompt for IP address
print("Please enter the IP address of your phone.")
print("This should be displayed in the SMS registration helper app")
print("You must be on the same network as your phone.")
phone = input("> ")
else:
phone = args.phone
2023-08-25 21:26:37 -05:00
import sms_registration
phone_number, phone_sig = sms_registration.register(push_token=conn.credentials.token,
no_parse=args.trigger_pdu, gateway=args.gateway,
phone_ip=phone)
2023-08-25 21:26:37 -05:00
CONFIG["phone"] = {
"number": phone_number,
"sig": b64encode(phone_sig).decode(),
}
safe_config()
users.append(ids.IDSPhoneUser.authenticate(conn, phone_number, phone_sig))
print("Would you like sign in to your Apple ID (recommended)? (y/n)")
if input("> ").lower() == "y":
username = input("Username: ")
password = input("Password: ")
users.append(ids.IDSAppleUser.authenticate(conn, username, password))
2023-08-24 06:31:11 -05:00
await reregister(conn, users)
2023-08-24 06:31:11 -05:00
if args.daemon:
wait_time_minutes = 5 # this is in minutes. 5 recommended
2023-12-02 14:09:31 -06:00
2023-11-16 15:55:08 -06:00
if args.reregister:
expiration = await reregister(conn, users)
else:
expiration = expiration_identifier(users)
2023-12-02 14:09:31 -06:00
if expiration is None:
expiration = await reregister(conn, users)
while True:
2023-12-02 14:09:31 -06:00
reregister_time = expiration - datetime.timedelta(
minutes=wait_time_minutes) # wait_time_minutes before expiration
reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
logging.info(f"Reregistering in {int(reregister_delta / 60)} minutes...")
2023-11-16 15:55:08 -06:00
if (reregister_delta > 0):
await trio.sleep(reregister_delta)
logging.info("Reregistering...")
2023-11-06 16:08:36 -06:00
expiration = await reregister(conn, users)
logging.info("Reregistered!")
if args.cronreg:
2023-12-02 14:09:31 -06:00
reregister_within = 60 # Minutes, time where if expiration time is less than, rereg.
for user in users:
if "P:" in str(user.user_id):
2023-11-15 20:31:42 -06:00
# logging.info(f'The user is: {user}')
cert = x509.load_pem_x509_certificate(user.id_cert.encode('utf-8'))
expiration = cert.not_valid_after
logging.info(f'Certificate expires on: {expiration}')
reregister_time = expiration - datetime.timedelta(minutes=reregister_within)
reregister_time = reregister_time.astimezone(datetime.timezone.utc)
logging.info(f'Reregistration will occur at: {reregister_time}')
reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
2023-12-02 14:09:31 -06:00
logging.info(
f'The time between now and reregistration time is: {(reregister_delta / 3600):.2f} hours or {(reregister_delta / 86400):.2f} days')
if reregister_delta > 3600:
logging.info('Certificates expiration is greater than 60 minutes, quiting')
else:
logging.info('Certificate expires soon, reregistering now')
expiration = await reregister(conn, users)
logging.info('Reregistered')
2023-11-16 15:55:08 -06:00
elif args.reregister:
await reregister(conn, users)
print("Done!")
2023-08-27 08:27:24 -05:00
2023-08-28 07:13:20 -05:00
if args.alive:
logging.getLogger("apns").setLevel(logging.DEBUG)
while True:
await trio.sleep(20)
2023-08-24 06:31:11 -05:00
2023-12-02 14:09:31 -06:00
2023-08-19 10:27:44 -05:00
async def input_task(im: imessage.iMessageUser):
while True:
cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True)
if cmd != "":
2023-08-25 16:41:55 -05:00
await im.send(imessage.iMessage.create(im, cmd, ["tel:+16106632676"]))
2023-08-19 10:27:44 -05:00
2023-12-02 14:09:31 -06:00
2023-08-19 10:27:44 -05:00
async def output_task(im: imessage.iMessageUser):
while True:
msg = await im.receive()
print(str(msg))
if __name__ == "__main__":
2023-08-27 08:27:24 -05:00
parser = argparse.ArgumentParser()
parser.add_argument("--reregister", action="store_true", help="Force re-registration")
parser.add_argument("--reg-notify", action="store_true", help="Get iMessage after each registration")
2023-08-28 07:13:20 -05:00
parser.add_argument("--alive", action="store_true", help="Keep the connection alive")
2023-12-02 14:09:31 -06:00
parser.add_argument("--client-data", action="store_true",
help="Publish client data (only necessary for actually sending/receiving messages)")
2023-09-02 18:09:41 -05:00
parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ")
# String arg to override pdu
parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP")
parser.add_argument("--phone", type=str, help="Override the phone IP")
parser.add_argument("--gateway", type=str, help="Override the gateway phone number")
2023-12-02 14:09:31 -06:00
parser.add_argument("--daemon", action="store_true",
help="Continuously reregister 5 minutes before the certificate expires")
parser.add_argument("--cronreg", action="store_true", help="Reregister if less than 60 minutes from expiration")
2023-09-02 18:09:41 -05:00
2023-08-27 08:27:24 -05:00
args = parser.parse_args()
2023-12-02 14:09:31 -06:00
2023-09-02 18:22:44 -05:00
if args.pdu is not None and not args.pdu.startswith("REG-RESP"):
raise InvalidResponseError("Received invalid REG-RESP PDU from Gateway Client.")
2023-09-02 18:20:46 -05:00
2023-10-13 17:24:39 -05:00
trio.run(main, args)