Use logging for logging everywhere

This commit is contained in:
JJTech0130 2023-07-24 09:18:21 -04:00
parent 6bbdafd8a1
commit d8cb6d7641
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
7 changed files with 48 additions and 3 deletions

View file

@ -10,6 +10,9 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509.oid import NameOID
import logging
logger = logging.getLogger("albert")
# These can probably be any valid chain and key in a similar format? Seems like a CoreTrust-like bug in validation?
# I got them from https://github.com/MiUnlockCode/albertsimlockapple/blob/main/ALBERTBUGBYMIUNLOCK.php
FAIRPLAY_PRIVATE_KEY = b64decode(
@ -60,6 +63,8 @@ def generate_push_cert() -> tuple[str, str]:
"UniqueDeviceID": str(uuid.uuid4()),
}
logger.debug(f"Generated activation info (with UUID: {activation_info['UniqueDeviceID']})")
activation_info = plistlib.dumps(activation_info)
# Load the private key
@ -86,6 +91,8 @@ def generate_push_cert() -> tuple[str, str]:
protocol = re.search("<Protocol>(.*)</Protocol>", resp.text).group(1) # type: ignore
protocol = plistlib.loads(protocol.encode("utf-8"))
logger.debug("Recieved push certificate from Albert")
return (
private_key.private_bytes(
encoding=serialization.Encoding.PEM,

14
apns.py
View file

@ -5,6 +5,7 @@ import socket
import threading
import time
from hashlib import sha1
from base64 import b64encode, b64decode
import logging
logger = logging.getLogger("apns")
@ -35,6 +36,8 @@ def _connect(private_key: str, cert: str) -> tlslite.TLSConnection:
# Handshake with the server
sock.handshakeClientCert(cert, private_key, alpn=ALPN)
logger.info(f"Connected to APNs ({COURIER_HOST})")
return sock
@ -128,6 +131,7 @@ class APNSConnection:
def __init__(self, private_key=None, cert=None):
# Generate the private key and certificate if they're not provided
if private_key is None or cert is None:
logger.debug("APNs needs a new push certificate")
self.private_key, self.cert = albert.generate_push_cert()
else:
self.private_key, self.cert = private_key, cert
@ -141,6 +145,10 @@ class APNSConnection:
self.queue_filler_thread.start()
def connect(self, root: bool = True, token: bytes = None):
if token is None:
logger.debug(f"Sending connect message without token (root={root})")
else:
logger.debug(f"Sending connect message with token {b64encode(token).decode()} (root={root})")
flags = 0b01000001
if root:
flags |= 0b0100
@ -178,9 +186,12 @@ class APNSConnection:
else:
raise Exception("No token")
logger.debug(f"Recieved connect response with token {b64encode(self.token).decode()}")
return self.token
def filter(self, topics: list[str]):
logger.debug(f"Sending filter message with topics {topics}")
fields = [(1, self.token)]
for topic in topics:
@ -191,6 +202,7 @@ class APNSConnection:
self.sock.write(payload)
def send_message(self, topic: str, payload: str, id=None):
logger.debug(f"Sending message to topic {topic} with payload {payload}")
if id is None:
id = random.randbytes(4)
@ -213,6 +225,7 @@ class APNSConnection:
raise Exception("Failed to send message")
def set_state(self, state: int):
logger.debug(f"Sending state message with state {state}")
self.sock.write(
_serialize_payload(
0x14,
@ -221,6 +234,7 @@ class APNSConnection:
)
def keep_alive(self):
logger.debug("Sending keep alive message")
self.sock.write(_serialize_payload(0x0C, []))
# def _send_ack(self, id: bytes):

View file

@ -1,6 +1,8 @@
import plistlib
import requests
import logging
logger = logging.getLogger("bags")
def apns_init_bag_old():
@ -11,6 +13,8 @@ def apns_init_bag_old():
# Parse the config as a plist
bag = plistlib.loads(r.content)
logger.debug("Received APNs old-style init bag")
return bag
@ -23,6 +27,8 @@ def apns_init_bag():
content = plistlib.loads(r.content)
bag = plistlib.loads(content["bag"])
logger.debug("Received APNs new init bag")
return bag
@ -38,6 +44,8 @@ def ids_bag():
# Load the inner bag
bag = plistlib.loads(content["bag"])
logger.debug("Recieved IDS bag")
return bag

View file

@ -17,6 +17,10 @@ logging.basicConfig(
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.DEBUG)
def input_multiline(prompt):
print(prompt)
@ -97,7 +101,7 @@ else:
vd = b64encode(vd).decode()
user.register(vd)
print(user.lookup(["mailto:textgpt@icloud.com"]))
logging.info(f"Looked up textgpt@icloud.com, got response: {user.lookup(['mailto:textgpt@icloud.com'])}")
# Write config.json
CONFIG["id"] = {

View file

@ -403,7 +403,6 @@ def load_nac() -> Jelly:
return j
def generate_validation_data() -> bytes:
logger.info("Generating validation data")
j = load_nac()
logger.debug("Loaded NAC library")
val_ctx, req = nac_init(j,get_cert())

View file

@ -6,10 +6,14 @@ import requests
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
from .signing import add_auth_signature, armour_cert
import logging
logger = logging.getLogger("ids")
def register(
push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, validation_data
):
logger.debug(f"Registering IDS identity for {handles}")
uris = [{"uri": handle} for handle in handles]
body = {
@ -47,7 +51,8 @@ def register(
verify=False,
)
r = plistlib.loads(r.content)
print(f'Response code: {r["status"]}')
#print(f'Response code: {r["status"]}')
logger.debug(f"Recieved response to IDS registration: {r}")
if "status" in r and r["status"] == 6004:
raise Exception("Validation data expired!")
# TODO: Do validation of nested statuses

View file

@ -16,6 +16,9 @@ import bags
from . import signing
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
import logging
logger = logging.getLogger("ids")
def _auth_token_request(username: str, password: str) -> any:
# Turn the PET into an auth token
@ -49,9 +52,11 @@ def get_auth_token(
# if use_gsa:
if platform == "darwin":
logger.debug("Using GrandSlam to authenticate (native Anisette)")
g = gsa.authenticate(username, password, gsa.Anisette())
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
else:
logger.debug("Using old-style authentication")
# Make the request without the 2FA code to make the prompt appear
_auth_token_request(username, password)
# TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
@ -68,6 +73,7 @@ def get_auth_token(
realm_user_id = service_data["realm-user-id"]
auth_token = service_data["auth-token"]
# print(f"Auth token for {realm_user_id}: {auth_token}")
logger.debug(f"Got auth token for IDS: {auth_token}")
return realm_user_id, auth_token
@ -119,6 +125,7 @@ def get_auth_cert(user_id, token) -> KeyPair:
if r["status"] != 0:
raise (Exception(f"Failed to get auth cert: {r}"))
cert = x509.load_der_x509_certificate(r["cert"])
logger.debug("Got auth cert from token")
return KeyPair(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
@ -153,4 +160,5 @@ def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
if not "handles" in r:
raise Exception("No handles in response: " + str(r))
logger.debug(f"User {user_id} has handles {r['handles']}")
return [handle["uri"] for handle in r["handles"]]