remove cruft, organize imports

This commit is contained in:
JJTech0130 2023-07-26 07:50:48 -04:00
parent ba1537ed58
commit c0f58c2fe2
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6

179
demo.py
View file

@ -1,12 +1,21 @@
import gzip
import json import json
from base64 import b64encode import logging
import plistlib
import threading
import time
from base64 import b64decode, b64encode
from getpass import getpass from getpass import getpass
from base64 import b64decode
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from rich.logging import RichHandler
import apns import apns
import ids import ids
from ids.keydec import IdentityKeys
import logging
from rich.logging import RichHandler
FORMAT = "%(message)s" FORMAT = "%(message)s"
logging.basicConfig( logging.basicConfig(
@ -22,64 +31,29 @@ logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG) logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.DEBUG) logging.getLogger("bags").setLevel(logging.DEBUG)
def input_multiline(prompt):
print(prompt)
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
return "\n".join(lines)
# Try and load config.json # Try and load config.json
try: try:
with open("config.json", "r") as f: with open("config.json", "r") as f:
CONFIG = json.load(f) CONFIG = json.load(f)
except FileNotFoundError: except FileNotFoundError:
CONFIG = {} CONFIG = {}
def convert_config(old):
new = {}
new["id"] = {
"key": old["key"],
"cert": old["ids_cert"],
}
new["auth"] = {
"key": old["key"],
"cert": old["auth_cert"],
"user_id": old["user_id"],
"handles": [
"mailto:user_test2@icloud.com",
]
#"handles": old["handles"],
}
new["push"] = {
"token": old["push"]["token"],
"key": old["push"]["key"],
"cert": old["push"]["cert"],
}
return new
# Uncomment this to change from an old config.json to a new one
#CONFIG = convert_config(CONFIG)
conn = apns.APNSConnection( conn = apns.APNSConnection(
CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert") CONFIG.get("push", {}).get("key"), CONFIG.get("push", {}).get("cert")
) )
def safe_b64decode(s): def safe_b64decode(s):
try: try:
return b64decode(s) return b64decode(s)
except: except:
return None return None
conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token"))) conn.connect(token=safe_b64decode(CONFIG.get("push", {}).get("token")))
conn.set_state(1) conn.set_state(1)
conn.filter(["com.apple.madrid"]) conn.filter(["com.apple.madrid"])
#print(b64encode(conn.token).decode())
user = ids.IDSUser(conn) user = ids.IDSUser(conn)
if CONFIG.get("auth", {}).get("cert") is not None: if CONFIG.get("auth", {}).get("cert") is not None:
@ -94,9 +68,6 @@ else:
user.authenticate(username, password) user.authenticate(username, password)
# Generate a new RSA keypair for the identity # Generate a new RSA keypair for the identity
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
# Load the old keypair if it exists # Load the old keypair if it exists
if CONFIG.get("encrypt") is not None: if CONFIG.get("encrypt") is not None:
priv_enc = load_pem_private_key(CONFIG["encrypt"].encode(), password=None) priv_enc = load_pem_private_key(CONFIG["encrypt"].encode(), password=None)
@ -108,73 +79,32 @@ if CONFIG.get("id", {}).get("cert") is not None:
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair) user.restore_identity(id_keypair)
else: else:
#vd = input_multiline("Enter validation data: ") # vd = input_multiline("Enter validation data: ")
import emulated.nac import emulated.nac
vd = emulated.nac.generate_validation_data() vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode() vd = b64encode(vd).decode()
from ids.keydec import IdentityKeys
published_keys = IdentityKeys(None, pub_enc) published_keys = IdentityKeys(None, pub_enc)
user.register(vd, published_keys) user.register(vd, published_keys)
#logging.info(f"Looked up textgpt@icloud.com, got response: {user.lookup(['mailto:textgpt@icloud.com'])}")
# logging.info("Enter a username to look up, for example: mailto:textgpt@icloud.com")
# while True:
# # Read a line from stdin
# line = input("> ")
# if line == "":
# break
# # Look up the username
# resp = user.lookup([line])
# #logging.info(f"Looked up {line}, got response: {user.lookup([line])}")
# info = resp[line]
# identities = info["identities"]
# logging.info(f"Identities: {len(identities)}")
# for identity in identities:
# logging.info(f"Identity: [yellow]{b64encode(identity['push-token']).decode()}[/] ({len(identity)} properties)", extra={"markup": True})
# if len(identity) > 5:
# logging.warning(identity)
#logging.debug(user.lookup(["mailto:usert4@icloud.com", "mailto:jjtech@jjtech.dev"]))
# resp = user.lookup(["mailto:jjtech@jjtech.dev"])
# info = resp["mailto:jjtech@jjtech.dev"]
# identities = info["identities"]
# for identity in identities:
# logging.info(f"Identity: [yellow]{b64encode(identity['push-token']).decode()}[/] ({len(identity)} properties)", extra={"markup": True})
# if "client-data" in identity:
# logging.warning(identity["client-data"])
logging.info("Waiting for incomming messages...") logging.info("Waiting for incomming messages...")
# Create a thread to send keepalive messages # Create a thread to send keepalive messages
import threading
import time
def keepalive(): def keepalive():
while True: while True:
time.sleep(5) time.sleep(5)
conn.keep_alive() conn.keep_alive()
threading.Thread(target=keepalive, daemon=True).start()
# while True:
# # # Wait for a message threading.Thread(target=keepalive, daemon=True).start()
# # # def check_response(x):
# # # if x[0] != 0x0A:
# # # return False
# # # resp_body = apns._get_field(x[1], 3)
# # # if resp_body is None:
# # # return False
# # # resp_body = apns._get_field(x[1], 3)
# # # if resp_body is None:
# # # return False
# # # resp_body = plistlib.loads(resp_body)
# # # return resp_body.get('U') == msg_id
# pass
# Write config.json # Write config.json
CONFIG["id"] = { CONFIG["id"] = {
"key": user._id_keypair.key, "key": user._id_keypair.key,
"cert": user._id_keypair.cert, "cert": user._id_keypair.cert,
} }
CONFIG["auth"] = { CONFIG["auth"] = {
"key": user._auth_keypair.key, "key": user._auth_keypair.key,
@ -187,40 +117,50 @@ CONFIG["push"] = {
"key": user.push_connection.private_key, "key": user.push_connection.private_key,
"cert": user.push_connection.cert, "cert": user.push_connection.cert,
} }
from cryptography.hazmat.primitives import serialization
CONFIG["encrypt"] = priv_enc.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()).decode("utf-8").strip()
CONFIG["encrypt"] = (
priv_enc.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode("utf-8")
.strip()
)
with open("config.json", "w") as f: with open("config.json", "w") as f:
json.dump(CONFIG, f, indent=4) json.dump(CONFIG, f, indent=4)
def decrypt(payload):
import gzip
#print(payload[1:3])
length = int.from_bytes(payload[1:3], "big")
#print("Length", length)
payload = payload[3:length+3]
#print("Decrypting payload", payload)
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
decrypted1 = priv_enc.decrypt(payload[:160], padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
))
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes def decrypt(payload):
cipher = Cipher(algorithms.AES(decrypted1[:16]), modes.CTR(b'\x00'*15 + b'\x01')) # print(payload[1:3])
length = int.from_bytes(payload[1:3], "big")
# print("Length", length)
payload = payload[3 : length + 3]
# print("Decrypting payload", payload)
decrypted1 = priv_enc.decrypt(
payload[:160],
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None,
),
)
cipher = Cipher(algorithms.AES(decrypted1[:16]), modes.CTR(b"\x00" * 15 + b"\x01"))
decryptor = cipher.decryptor() decryptor = cipher.decryptor()
pt = decryptor.update(decrypted1[16:] + payload[160:]) pt = decryptor.update(decrypted1[16:] + payload[160:])
#print(pt) # print(pt)
pt = gzip.decompress(pt) pt = gzip.decompress(pt)
payload = plistlib.loads(pt) payload = plistlib.loads(pt)
#logging.debug(f"Got payload: {payload}") # logging.debug(f"Got payload: {payload}")
return payload return payload
import plistlib
while True: while True:
def check_response(x): def check_response(x):
if x[0] != 0x0A: if x[0] != 0x0A:
return False return False
@ -231,14 +171,13 @@ while True:
if "P" not in resp_body: if "P" not in resp_body:
return False return False
return True return True
payload = conn.incoming_queue.wait_pop_find(check_response) payload = conn.incoming_queue.wait_pop_find(check_response)
resp_body = apns._get_field(payload[1], 3) resp_body = apns._get_field(payload[1], 3)
id = apns._get_field(payload[1], 4) id = apns._get_field(payload[1], 4)
conn._send_ack(id) conn._send_ack(id)
resp_body = plistlib.loads(resp_body) resp_body = plistlib.loads(resp_body)
#logging.info(f"Got response: {resp_body}") # logging.info(f"Got response: {resp_body}")
payload = resp_body["P"] payload = resp_body["P"]
payload = decrypt(payload) payload = decrypt(payload)
logging.info(f"Got message: {payload['t']} from {payload['p'][1]}") logging.info(f"Got message: {payload['t']} from {payload['p'][1]}")