General cleanup to PEP-8 standards

This commit is contained in:
Steven Burnham 2023-12-02 15:09:31 -05:00
parent ba25d4e379
commit 6aea1fbbfb
No known key found for this signature in database
GPG key ID: D765679712A2FC3D

117
demo.py
View file

@ -1,23 +1,18 @@
import json import json
import logging import logging
import os import argparse
import threading import datetime
import time
import traceback
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from getpass import getpass from getpass import getpass
from cryptography import x509 from cryptography import x509
import datetime
from rich.logging import RichHandler from rich.logging import RichHandler
import trio
import apns import apns
import ids import ids
import imessage import imessage
import trio
import argparse
from exceptions import * from exceptions import *
logging.basicConfig( logging.basicConfig(
@ -45,27 +40,33 @@ try:
except FileNotFoundError: except FileNotFoundError:
CONFIG = {} CONFIG = {}
def safe_b64decode(s): def safe_b64decode(s):
try: try:
return b64decode(s) return b64decode(s)
except: except (ValueError, TypeError) as e:
print(f"Error decoding b64: {e}")
return None return None
def safe_config(): def safe_config():
with open("config.json", "w") as f: with open("config.json", "w", encoding="utf-8") as f:
json.dump(CONFIG, f, indent=4) json.dump(CONFIG, f, indent=4)
def get_not_valid_after_timestamp(cert_data): def get_not_valid_after_timestamp(cert_data):
try: try:
cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8')) cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8'))
return cert.not_valid_after return cert.not_valid_after
except Exception as e: except (ValueError, TypeError, AttributeError) as e:
print(f"Error determining certificate expiration date: {e}")
return None # Return None in case of an error return None # Return None in case of an error
expiration = None expiration = None
async def main(args: argparse.Namespace):
async def main(args: argparse.Namespace):
global expiration global expiration
# Load any existing push credentials # Load any existing push credentials
@ -74,31 +75,32 @@ async def main(args: argparse.Namespace):
push_creds = apns.PushCredentials( push_creds = apns.PushCredentials(
CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token) CONFIG.get("push", {}).get("key", ""), CONFIG.get("push", {}).get("cert", ""), token)
def register(conn, users): def register(conn, users):
import emulated.nac import emulated.nac
vd = emulated.nac.generate_validation_data() vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode() vd = b64encode(vd).decode()
users = ids.register(conn, users, vd, args.client_data or args.reg_notify) users = ids.register(conn, users, vd, args.client_data or args.reg_notify)
return users return users
def expiration_identifier(users: list[ids.IDSUser]) -> datetime.datetime | None:
expiration = None
# Format time as HH:MM:SS PM/AM EST/EDT (X minutes from now)
expire_msg = lambda expiration: f"Number registration is valid until {str(expiration.astimezone().strftime('%x %I:%M:%S %p %Z'))} ({str(int((expiration - datetime.datetime.now(datetime.timezone.utc)).total_seconds()/60))} minutes from now)"
for user in users:
# If this is a phone number user, then it's got to be the one we just linked
# so pull out the expiration date from the certificate
if "P:" in str(user.user_id):
# There is not really a good reason to try/catch here: If we couldn't reregister, just crash (very unlikely we can recover)
cert = x509.load_pem_x509_certificate(user.id_cert.encode('utf-8'))
expiration = cert.not_valid_after
# Make it a UTC aware timezone, for reasons
expiration = expiration.replace(tzinfo=datetime.timezone.utc)
logging.info(expire_msg(expiration))
return expiration
def expiration_identifier(users: list[ids.IDSUser]) -> datetime.datetime | None:
expiration = None
# Format time as HH:MM:SS PM/AM EST/EDT (X minutes from now)
expire_msg = lambda \
expiration: f"Number registration is valid until {str(expiration.astimezone().strftime('%x %I:%M:%S %p %Z'))} ({str(int((expiration - datetime.datetime.now(datetime.timezone.utc)).total_seconds() / 60))} minutes from now)"
for user in users:
# If this is a phone number user, then it's got to be the one we just linked
# so pull out the expiration date from the certificate
if "P:" in str(user.user_id):
# There is not really a good reason to try/catch here: If we couldn't reregister,
# just crash (very unlikely we can recover)
cert = x509.load_pem_x509_certificate(user.id_cert.encode('utf-8'))
expiration = cert.not_valid_after
# Make it a UTC aware timezone, for reasons
expiration = expiration.replace(tzinfo=datetime.timezone.utc)
logging.info(expire_msg(expiration))
return expiration
async def reregister(conn: apns.APNSConnection, users: list[ids.IDSUser]) -> datetime.datetime: async def reregister(conn: apns.APNSConnection, users: list[ids.IDSUser]) -> datetime.datetime:
register(conn, users) register(conn, users)
@ -106,10 +108,11 @@ async def main(args: argparse.Namespace):
expiration = None expiration = None
# Format time as HH:MM:SS PM/AM EST/EDT (X minutes from now) # Format time as HH:MM:SS PM/AM EST/EDT (X minutes from now)
expire_msg = lambda expiration: f"Number registration is valid until {str(expiration.astimezone().strftime('%x %I:%M:%S %p %Z'))} ({str(int((expiration - datetime.datetime.now(datetime.timezone.utc)).total_seconds()/60))} minutes from now)" expire_msg = lambda \
expiration: f"Number registration is valid until {str(expiration.astimezone().strftime('%x %I:%M:%S %p %Z'))} ({str(int((expiration - datetime.datetime.now(datetime.timezone.utc)).total_seconds() / 60))} minutes from now)"
email_user = None email_user = None
email_addr = None # For HACK below email_addr = None # For HACK below
for user in users: for user in users:
# Clear the config and re-save everything to match the new registration # Clear the config and re-save everything to match the new registration
@ -133,8 +136,8 @@ async def main(args: argparse.Namespace):
# Set up a temporary iMessage user to send notifications # Set up a temporary iMessage user to send notifications
im = imessage.iMessageUser(conn, email_user) im = imessage.iMessageUser(conn, email_user)
im.current_handle = email_addr # HACK: See above im.current_handle = email_addr # HACK: See above
# Notify other devices on the account that new handles are available # Notify other devices on the account that new handles are available
await im._send_raw(130, [im.current_handle], "com.apple.madrid") await im._send_raw(130, [im.current_handle], "com.apple.madrid")
@ -164,21 +167,25 @@ async def main(args: argparse.Namespace):
# If the user wants a phone number, we need to register it WITH an Apple ID, then register the Apple ID again # If the user wants a phone number, we need to register it WITH an Apple ID, then register the Apple ID again
# otherwise we encounter issues for some reason # otherwise we encounter issues for some reason
users = [] users = []
if "id" in CONFIG: if "id" in CONFIG:
logging.debug("Restoring old-style identity...") logging.debug("Restoring old-style identity...")
users.append(ids.IDSAppleUser(conn, CONFIG["auth"]["user_id"], ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]), users.append(ids.IDSAppleUser(conn, CONFIG["auth"]["user_id"],
ids.identity.IDSIdentity(CONFIG["encryption"]["ec_key"], CONFIG["encryption"]["rsa_key"]), CONFIG["id"]["cert"], ids._helpers.KeyPair(CONFIG["auth"]["key"], CONFIG["auth"]["cert"]),
CONFIG["auth"]["handles"])) ids.identity.IDSIdentity(CONFIG["encryption"]["ec_key"],
CONFIG["encryption"]["rsa_key"]),
CONFIG["id"]["cert"],
CONFIG["auth"]["handles"]))
if "users" in CONFIG: if "users" in CONFIG:
logging.debug("Restoring new-style identity...") logging.debug("Restoring new-style identity...")
for user in CONFIG["users"]: for user in CONFIG["users"]:
users.append(ids.IDSUser(conn, user["id"], ids._helpers.KeyPair(user["auth_key"], user["auth_cert"]), users.append(ids.IDSUser(conn, user["id"], ids._helpers.KeyPair(user["auth_key"], user["auth_cert"]),
ids.identity.IDSIdentity(user["signing_key"], user["encryption_key"]), user["id_cert"], ids.identity.IDSIdentity(user["signing_key"], user["encryption_key"]),
user["handles"])) user["id_cert"],
user["handles"]))
else: else:
print("Would you like to register a phone number? (y/n)") print("Would you like to register a phone number? (y/n)")
if input("> ").lower() == "y": if input("> ").lower() == "y":
@ -191,7 +198,7 @@ async def main(args: argparse.Namespace):
phone_number, phone_sig = sms_registration.parse_pdu(args.pdu, None) phone_number, phone_sig = sms_registration.parse_pdu(args.pdu, None)
else: else:
if args.phone is None: if args.phone is None:
#raise GatewayConnectionError("You did not supply an IP address.") # raise GatewayConnectionError("You did not supply an IP address.")
# Prompt for IP address # Prompt for IP address
print("Please enter the IP address of your phone.") print("Please enter the IP address of your phone.")
print("This should be displayed in the SMS registration helper app") print("This should be displayed in the SMS registration helper app")
@ -222,17 +229,18 @@ async def main(args: argparse.Namespace):
if args.daemon: if args.daemon:
wait_time_minutes = 5 # this is in minutes. 5 recommended wait_time_minutes = 5 # this is in minutes. 5 recommended
if args.reregister: if args.reregister:
expiration = await reregister(conn, users) expiration = await reregister(conn, users)
else: else:
expiration = expiration_identifier(users) expiration = expiration_identifier(users)
if expiration is None: if expiration is None:
expiration = await reregister(conn, users) expiration = await reregister(conn, users)
while True: while True:
reregister_time = expiration - datetime.timedelta(minutes=wait_time_minutes) # wait_time_minutes before expiration reregister_time = expiration - datetime.timedelta(
minutes=wait_time_minutes) # wait_time_minutes before expiration
reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds() reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
logging.info(f"Reregistering in {int(reregister_delta / 60)} minutes...") logging.info(f"Reregistering in {int(reregister_delta / 60)} minutes...")
@ -245,7 +253,7 @@ async def main(args: argparse.Namespace):
logging.info("Reregistered!") logging.info("Reregistered!")
if args.cronreg: if args.cronreg:
reregister_within = 60 # Minutes, time where if expiration time is less than, rereg. reregister_within = 60 # Minutes, time where if expiration time is less than, rereg.
for user in users: for user in users:
if "P:" in str(user.user_id): if "P:" in str(user.user_id):
# logging.info(f'The user is: {user}') # logging.info(f'The user is: {user}')
@ -256,7 +264,8 @@ async def main(args: argparse.Namespace):
reregister_time = reregister_time.astimezone(datetime.timezone.utc) reregister_time = reregister_time.astimezone(datetime.timezone.utc)
logging.info(f'Reregistration will occur at: {reregister_time}') logging.info(f'Reregistration will occur at: {reregister_time}')
reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds() reregister_delta = (reregister_time - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
logging.info(f'The time between now and reregistration time is: {(reregister_delta / 3600):.2f} hours or {(reregister_delta / 86400):.2f} days') logging.info(
f'The time between now and reregistration time is: {(reregister_delta / 3600):.2f} hours or {(reregister_delta / 86400):.2f} days')
if reregister_delta > 3600: if reregister_delta > 3600:
logging.info('Certificates expiration is greater than 60 minutes, quiting') logging.info('Certificates expiration is greater than 60 minutes, quiting')
else: else:
@ -274,12 +283,14 @@ async def main(args: argparse.Namespace):
while True: while True:
await trio.sleep(20) await trio.sleep(20)
async def input_task(im: imessage.iMessageUser): async def input_task(im: imessage.iMessageUser):
while True: while True:
cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True) cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True)
if cmd != "": if cmd != "":
await im.send(imessage.iMessage.create(im, cmd, ["tel:+16106632676"])) await im.send(imessage.iMessage.create(im, cmd, ["tel:+16106632676"]))
async def output_task(im: imessage.iMessageUser): async def output_task(im: imessage.iMessageUser):
while True: while True:
msg = await im.receive() msg = await im.receive()
@ -291,17 +302,19 @@ if __name__ == "__main__":
parser.add_argument("--reregister", action="store_true", help="Force re-registration") parser.add_argument("--reregister", action="store_true", help="Force re-registration")
parser.add_argument("--reg-notify", action="store_true", help="Get iMessage after each registration") parser.add_argument("--reg-notify", action="store_true", help="Get iMessage after each registration")
parser.add_argument("--alive", action="store_true", help="Keep the connection alive") parser.add_argument("--alive", action="store_true", help="Keep the connection alive")
parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)") parser.add_argument("--client-data", action="store_true",
help="Publish client data (only necessary for actually sending/receiving messages)")
parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ") parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ")
# String arg to override pdu # String arg to override pdu
parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP") parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP")
parser.add_argument("--phone", type=str, help="Override the phone IP") parser.add_argument("--phone", type=str, help="Override the phone IP")
parser.add_argument("--gateway", type=str, help="Override the gateway phone number") parser.add_argument("--gateway", type=str, help="Override the gateway phone number")
parser.add_argument("--daemon", action="store_true", help="Continuously reregister 5 minutes before the certificate expires") parser.add_argument("--daemon", action="store_true",
help="Continuously reregister 5 minutes before the certificate expires")
parser.add_argument("--cronreg", action="store_true", help="Reregister if less than 60 minutes from expiration") parser.add_argument("--cronreg", action="store_true", help="Reregister if less than 60 minutes from expiration")
args = parser.parse_args() args = parser.parse_args()
if args.pdu is not None and not args.pdu.startswith("REG-RESP"): if args.pdu is not None and not args.pdu.startswith("REG-RESP"):
raise InvalidResponseError("Received invalid REG-RESP PDU from Gateway Client.") raise InvalidResponseError("Received invalid REG-RESP PDU from Gateway Client.")