From b128ac514a9a7b0cf8ee0f81acd37e7db375d2a6 Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Tue, 11 Apr 2023 12:23:04 -0400 Subject: [PATCH] formatting and import organizing --- albert.py | 13 ++-- apns.py | 95 +++++++++++++++---------- config.py | 14 ++-- courier.py | 8 ++- demo.py | 16 ++--- printer.py | 185 +++++++++++++++++++++++++++++++------------------ proxy/hosts.py | 2 +- proxy/proxy.py | 71 +++++++++++-------- 8 files changed, 250 insertions(+), 154 deletions(-) diff --git a/albert.py b/albert.py index 501a471..c67e826 100644 --- a/albert.py +++ b/albert.py @@ -90,13 +90,16 @@ def generate_push_cert() -> tuple[str, str]: encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), - ).decode("utf-8").strip(), - protocol["device-activation"]["activation-record"]["DeviceCertificate"].decode( - "utf-8" - ).strip(), + ) + .decode("utf-8") + .strip(), + protocol["device-activation"]["activation-record"]["DeviceCertificate"] + .decode("utf-8") + .strip(), ) + if __name__ == "__main__": private_key, cert = generate_push_cert() print(private_key) - print(cert) \ No newline at end of file + print(cert) diff --git a/apns.py b/apns.py index f13afa5..a1f663c 100644 --- a/apns.py +++ b/apns.py @@ -1,42 +1,45 @@ from __future__ import annotations -import courier, albert -from hashlib import sha1 +import random import threading import time -import random +from hashlib import sha1 + +import albert +import courier + class APNSConnection: incoming_queue = [] def _queue_filler(self): while True and not self.sock.closed: - #print(self.sock.closed) - #print("QUEUE: Waiting for payload...") - #self.sock.read(1) - #print("QUEUE: Got payload?") + # print(self.sock.closed) + # print("QUEUE: Waiting for payload...") + # self.sock.read(1) + # print("QUEUE: Got payload?") payload = _deserialize_payload(self.sock) - #print("QUEUE: Got payload?") + # print("QUEUE: Got payload?") if payload is not None: - #print("QUEUE: Received payload: " + str(payload)) + # print("QUEUE: Received payload: " + str(payload)) self.incoming_queue.append(payload) - #print("QUEUE: Thread ended") + # print("QUEUE: Thread ended") def _pop_by_id(self, id: int) -> tuple[int, list[tuple[int, bytes]]] | None: - #print("QUEUE: Looking for id " + str(id) + " in " + str(self.incoming_queue)) + # print("QUEUE: Looking for id " + str(id) + " in " + str(self.incoming_queue)) for i in range(len(self.incoming_queue)): if self.incoming_queue[i][0] == id: return self.incoming_queue.pop(i) return None - + def wait_for_packet(self, id: int) -> tuple[int, list[tuple[int, bytes]]]: payload = self._pop_by_id(id) while payload is None: payload = self._pop_by_id(id) time.sleep(0.1) return payload - + def __init__(self, private_key=None, cert=None): # Generate the private key and certificate if they're not provided if private_key is None or cert is None: @@ -47,26 +50,36 @@ class APNSConnection: self.sock = courier.connect(self.private_key, self.cert) # Start the queue filler thread - self.queue_filler_thread = threading.Thread(target=self._queue_filler, daemon=True) + self.queue_filler_thread = threading.Thread( + target=self._queue_filler, daemon=True + ) self.queue_filler_thread.start() def connect(self, root: bool = True, token: bytes = None): flags = 0b01000001 if root: flags |= 0b0100 - + if token is None: - payload = _serialize_payload(7, [(2, 0x01.to_bytes()), (5, flags.to_bytes(4))]) + payload = _serialize_payload( + 7, [(2, 0x01.to_bytes()), (5, flags.to_bytes(4))] + ) else: - payload = _serialize_payload(7, [(1, token), (2, 0x01.to_bytes()), (5, flags.to_bytes(4))]) + payload = _serialize_payload( + 7, [(1, token), (2, 0x01.to_bytes()), (5, flags.to_bytes(4))] + ) self.sock.write(payload) payload = self.wait_for_packet(8) - if payload == None or payload[0] != 8 or _get_field(payload[1], 1) != 0x00.to_bytes(): + if ( + payload == None + or payload[0] != 8 + or _get_field(payload[1], 1) != 0x00.to_bytes() + ): raise Exception("Failed to connect") - + self.token = _get_field(payload[1], 3) return self.token @@ -80,32 +93,40 @@ class APNSConnection: payload = _serialize_payload(9, fields) self.sock.write(payload) - - def send_message(self, topic: str, payload: str, id = None): + + def send_message(self, topic: str, payload: str, id=None): if id is None: id = random.randbytes(4) - - payload = _serialize_payload(0x0a, - [(4, id), - (1, sha1(topic.encode()).digest()), - (2, self.token), - (3, payload)]) - + + payload = _serialize_payload( + 0x0A, + [ + (4, id), + (1, sha1(topic.encode()).digest()), + (2, self.token), + (3, payload), + ], + ) + self.sock.write(payload) - payload = self.wait_for_packet(0x0b) + payload = self.wait_for_packet(0x0B) if payload[1][0][1] != 0x00.to_bytes(): - raise Exception("Failed to send message") + raise Exception("Failed to send message") def set_state(self, state: int): - self.sock.write(_serialize_payload(0x14, [(1, state.to_bytes(1)), (2, 0x7FFFFFFF.to_bytes(4))])) + self.sock.write( + _serialize_payload( + 0x14, [(1, state.to_bytes(1)), (2, 0x7FFFFFFF.to_bytes(4))] + ) + ) def keep_alive(self): - self.sock.write(_serialize_payload(0x0c, [])) + self.sock.write(_serialize_payload(0x0C, [])) # TODO: Find a way to make this non-blocking - #def expect_message(self) -> tuple[int, list[tuple[int, bytes]]] | None: + # def expect_message(self) -> tuple[int, list[tuple[int, bytes]]] | None: # return _deserialize_payload(self.sock) @@ -129,6 +150,7 @@ def _deserialize_field(stream: bytes) -> tuple[int, bytes]: value = stream[3 : 3 + length] return id, value + # Note: Takes a stream, not a buffer, as we do not know the length of the payload # WILL BLOCK IF THE STREAM IS EMPTY def _deserialize_payload(stream) -> tuple[int, list[tuple[int, bytes]]] | None: @@ -150,7 +172,10 @@ def _deserialize_payload(stream) -> tuple[int, list[tuple[int, bytes]]] | None: return id, fields -def _deserialize_payload_from_buffer(buffer: bytes) -> tuple[int, list[tuple[int, bytes]]] | None: + +def _deserialize_payload_from_buffer( + buffer: bytes, +) -> tuple[int, list[tuple[int, bytes]]] | None: id = int.from_bytes(buffer[:1], "big") if id == 0x0: @@ -178,4 +203,4 @@ def _get_field(fields: list[tuple[int, bytes]], id: int) -> bytes: for field_id, value in fields: if field_id == id: return value - return None \ No newline at end of file + return None diff --git a/config.py b/config.py index 52701a3..a5afc5c 100644 --- a/config.py +++ b/config.py @@ -1,23 +1,25 @@ -import requests import plistlib -#CONFIG_URL = "http://init-p01st.push.apple.com/bag" +import requests + +# CONFIG_URL = "http://init-p01st.push.apple.com/bag" CONFIG_URL = "https://init.push.apple.com/bag" -def get_config(): +def get_config(): r = requests.get(CONFIG_URL, verify=False) if r.status_code != 200: raise Exception("Failed to get config") - + # Parse the config as a plist config = plistlib.loads(r.content) # Parse the nested "bag" as a plist - #config["bag"] = plistlib.loads(config["bag"]) + # config["bag"] = plistlib.loads(config["bag"]) return config + if __name__ == "__main__": config = get_config() - print(config) \ No newline at end of file + print(config) diff --git a/courier.py b/courier.py index 3288637..4e1d916 100644 --- a/courier.py +++ b/courier.py @@ -1,10 +1,12 @@ -import tlslite import socket -COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config +import tlslite + +COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config COURIER_PORT = 5223 ALPN = [b"apns-security-v2"] + # Connect to the courier server def connect(private_key: str, cert: str) -> tlslite.TLSConnection: # Connect to the courier server @@ -19,9 +21,9 @@ def connect(private_key: str, cert: str) -> tlslite.TLSConnection: return sock + if __name__ == "__main__": sock = connect() sock.write(b"Hello World!") print(sock.read()) sock.close() - diff --git a/demo.py b/demo.py index 3237072..f0e72bc 100644 --- a/demo.py +++ b/demo.py @@ -1,17 +1,15 @@ -import plistlib -import zlib -from base64 import b64decode, b64encode -from hashlib import sha1 - import apns import ids conn1 = apns.APNSConnection() conn1.connect() -conn1.keep_alive() -conn1.set_state(0x01) -conn1.filter([]) -conn1.connect(False) + +# Uncomment these for greater parity with apsd +# conn1.keep_alive() +# conn1.set_state(0x01) +# conn1.filter([]) +# conn1.connect(False) + conn1.filter(["com.apple.madrid"]) print(ids.lookup(conn1, ["mailto:jjtech@jjtech.dev"])) diff --git a/printer.py b/printer.py index b61c647..2a859b3 100644 --- a/printer.py +++ b/printer.py @@ -1,6 +1,7 @@ -from base64 import b64encode, b64decode +import plistlib +import zlib +from base64 import b64decode, b64encode from hashlib import sha1 -import plistlib, zlib # Taken from debug logs of apsd enabled_topics = "(com.apple.icloud-container.com.apple.avatarsd, com.icloud.askpermission, com.apple.icloud-container.com.apple.Safari, com.apple.itunesstored, com.apple.icloud-container.clouddocs.com.apple.CloudDocs.health, com.apple.passd.usernotifications, com.apple.icloud-container.com.apple.donotdisturbd, com.apple.icloud-container.clouddocs.iCloud.com.reddit.reddit, com.apple.mobileme.fmf3, com.apple.icloud-container.com.apple.cloudpaird, com.apple.icloud-container.clouddocs.com.apple.Pages, com.apple.appstored-testflight, com.apple.askpermissiond, com.apple.icloud-container.com.apple.willowd, com.me.cal, com.apple.icloud-container.com.apple.suggestd, com.apple.icloud-container.clouddocs.F3LWYJ7GM7.com.apple.garageband10, com.apple.icloud-container.clouddocs.com.apple.CloudDocs.container-metadata, com.apple.icloud-container.com.apple.callhistory.sync-helper, com.apple.icloud-container.com.apple.syncdefaultsd, com.apple.icloud-container.com.apple.SafariShared.Settings, com.apple.pay.services.products.prod, com.apple.icloud-container.com.apple.StatusKitAgent, com.apple.icloud-container.com.apple.siriknowledged, com.me.contacts, com.apple.icloud-container.com.apple.TrustedPeersHelper, com.apple.icloud-container.clouddocs.iCloud.com.apple.iBooks, com.apple.icloud-container.clouddocs.iCloud.dk.simonbs.Scriptable, com.apple.icloud-container.clouddocs.com.apple.ScriptEditor2, com.icloud.family, com.apple.idmsauth, com.apple.watchList, com.apple.icloud-container.clouddocs.com.apple.TextEdit, com.apple.icloud-container.com.apple.VoiceMemos, com.apple.sharedstreams, com.apple.pay.services.apply.prod, com.apple.icloud-container.com.apple.SafariShared.CloudTabs, com.apple.wallet.sharing.qa, com.apple.appstored, com.apple.icloud-container.clouddocs.3L68KQB4HG.com.readdle.CommonDocuments, com.apple.icloud-container.clouddocs.com.apple.CloudDocs.pp-metadata, com.me.setupservice, com.apple.icloud-container.com.apple.amsengagementd, com.apple.icloud-container.com.apple.appleaccount.beneficiary.private, com.apple.icloud-container.com.apple.appleaccount.beneficiary, com.apple.icloud-container.clouddocs.com.apple.mail, com.apple.icloud-container.com.apple.appleaccount.custodian, com.apple.icloud-container.com.apple.securityd, com.apple.icloud-container.com.apple.iBooksX, com.apple.icloud-container.clouddocs.com.apple.QuickTimePlayerX, com.apple.icloud-container.clouddocs.com.apple.TextInput, com.apple.icloud-container.com.apple.icloud.fmfd, com.apple.tv.favoriteTeams, com.apple.pay.services.ownershipTokens.prod, com.apple.icloud-container.com.apple.passd, com.apple.amsaccountsd, com.apple.pay.services.devicecheckin.prod.us, com.apple.storekit, com.apple.icloud-container.com.apple.keyboardservicesd, paymentpass.com.apple, com.apple.aa.setupservice, com.apple.icloud-container.clouddocs.com.apple.shoebox, com.apple.icloud-container.clouddocs.F3LWYJ7GM7.com.apple.mobilegarageband, com.apple.icloud-container.com.apple.icloud.searchpartyuseragent, com.apple.icloud-container.clouddocs.iCloud.com.apple.configurator.ui, com.apple.icloud-container.com.apple.gamed, com.apple.icloud-container.clouddocs.com.apple.Keynote, com.apple.icloud-container.com.apple.willowd.homekit, com.apple.amsengagementd.notifications, com.apple.icloud.presence.mode.status, com.apple.aa.idms, com.apple.icloud-container.clouddocs.iCloud.com.apple.MobileSMS, com.apple.gamed, com.apple.icloud-container.clouddocs.iCloud.is.workflow.my.workflows, com.apple.icloud-container.clouddocs.iCloud.md.obsidian, com.apple.icloud-container.clouddocs.com.apple.CloudDocs, com.apple.wallet.sharing, com.apple.icloud-container.clouddocs.iCloud.com.apple.iBooks.iTunesU, com.apple.icloud.presence.shared.experience, com.apple.icloud-container.com.apple.imagent, com.apple.icloud-container.com.apple.financed, com.apple.pay.services.account.prod, com.apple.icloud-container.com.apple.assistant.assistantd, com.apple.pay.services.ck.zone.prod, com.apple.icloud-container.com.apple.security.cuttlefish, com.apple.icloud-container.clouddocs.com.apple.iBooks.cloudData, com.apple.peerpayment, com.icloud.quota, com.apple.pay.provision, com.apple.icloud-container.com.apple.upload-request-proxy.com.apple.photos.cloud, com.apple.icloud-container.com.apple.appleaccount.custodian.private, com.apple.icloud-container.clouddocs.com.apple.Preview, com.apple.maps.icloud, com.apple.icloud-container.com.apple.reminders, com.apple.icloud-container.com.apple.SafariShared.WBSCloudBookmarksStore, com.apple.idmsauthagent, com.apple.icloud-container.clouddocs.com.apple.Numbers, com.apple.bookassetd, com.apple.pay.auxiliary.registration.requirement.prod, com.apple.icloud.fmip.voiceassistantsync)" @@ -17,16 +18,18 @@ topics = enabled_topics + opportunistic_topics + paused_topics # Calculate the SHA1 hash of each topic topics_lookup = [(topic, sha1(topic.encode()).digest()) for topic in topics] + class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + def _lookup_topic(hash: bytes): for topic_lookup in topics_lookup: @@ -34,6 +37,7 @@ def _lookup_topic(hash: bytes): return topic_lookup[0] return None + # Returns the value of the first field with the given id def _get_field(fields: list[tuple[int, bytes]], id: int) -> bytes: for field_id, value in fields: @@ -41,6 +45,7 @@ def _get_field(fields: list[tuple[int, bytes]], id: int) -> bytes: return value return None + def _p_filter(prefix, fields: list[tuple[int, bytes]]): enabled = [] ignored = [] @@ -52,7 +57,7 @@ def _p_filter(prefix, fields: list[tuple[int, bytes]]): for field in fields: if field[0] == 1: token = b64encode(field[1]) - #print(f"Push Token: {b64encode(field[1])}") + # print(f"Push Token: {b64encode(field[1])}") elif field[0] == 2: enabled.append(_lookup_topic(field[1])) elif field[0] == 3: @@ -62,8 +67,8 @@ def _p_filter(prefix, fields: list[tuple[int, bytes]]): elif field[0] == 5: paused.append(_lookup_topic(field[1])) else: - pass # whatever, there's a 6 but it's not documented - #print(f"Unknown field ID: {field[0]}") + pass # whatever, there's a 6 but it's not documented + # print(f"Unknown field ID: {field[0]}") # Remove None values enabled = [topic.strip() for topic in enabled if topic is not None] @@ -95,11 +100,17 @@ def _p_filter(prefix, fields: list[tuple[int, bytes]]): if len(paused) > 100: paused = paused[:100] + "..." # (Token: {token.decode()}) - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Filter{bcolors.ENDC} {bcolors.WARNING}Enabled{bcolors.ENDC}: {enabled} {bcolors.FAIL}Ignored{bcolors.ENDC}: {ignored} {bcolors.OKBLUE}Oppertunistic{bcolors.ENDC}: {oppertunistic} {bcolors.OKGREEN}Paused{bcolors.ENDC}: {paused}") + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Filter{bcolors.ENDC} {bcolors.WARNING}Enabled{bcolors.ENDC}: {enabled} {bcolors.FAIL}Ignored{bcolors.ENDC}: {ignored} {bcolors.OKBLUE}Oppertunistic{bcolors.ENDC}: {oppertunistic} {bcolors.OKGREEN}Paused{bcolors.ENDC}: {paused}" + ) + import apns -def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) -> bytes | None: + +def pretty_print_payload( + prefix, payload: tuple[int, list[tuple[int, bytes]]] +) -> bytes | None: id = payload[0] if id == 9: @@ -108,54 +119,78 @@ def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) - token_str = "" if _get_field(payload[1], 3): token_str = f"{bcolors.WARNING}Token{bcolors.ENDC}: {b64encode(_get_field(payload[1], 3)).decode()}" - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Connected{bcolors.ENDC} {token_str} {bcolors.OKBLUE}{_get_field(payload[1], 1).hex()}{bcolors.ENDC}") + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Connected{bcolors.ENDC} {token_str} {bcolors.OKBLUE}{_get_field(payload[1], 1).hex()}{bcolors.ENDC}" + ) elif id == 7: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Connect Request{bcolors.ENDC}", end="") + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Connect Request{bcolors.ENDC}", + end="", + ) if _get_field(payload[1], 1): - print(f" {bcolors.WARNING}Token{bcolors.ENDC}: {b64encode(_get_field(payload[1], 1)).decode()}", end="") - if _get_field(payload[1], 0x0c): + print( + f" {bcolors.WARNING}Token{bcolors.ENDC}: {b64encode(_get_field(payload[1], 1)).decode()}", + end="", + ) + if _get_field(payload[1], 0x0C): print(f" {bcolors.OKBLUE}SIGNED{bcolors.ENDC}", end="") - if _get_field(payload[1], 0x5) and int.from_bytes(_get_field(payload[1], 0x5)) & 0x4: + if ( + _get_field(payload[1], 0x5) + and int.from_bytes(_get_field(payload[1], 0x5)) & 0x4 + ): print(f" {bcolors.FAIL}ROOT{bcolors.ENDC}", end="") print() - #for field in payload[1]: + # for field in payload[1]: # print(f"Field ID: {field[0]}") # print(f"Field Value: {field[1]}") - # 65 (user) or 69 (root) - + # 65 (user) or 69 (root) + for i in range(len(payload[1])): - #if payload[1][i][0] == 5: - #if payload[1][i][1] == b'\x00\x00\x00A': # user - # payload[1][i][1] = b'\x00\x00\x00E' - #elif payload[1][i][1] == b'\x00\x00\x00E': # root - # payload[1][i][1] = b'\x00\x00\x00A' - #else: - # print("Unknown field value: ", payload[1][i][1]) + # if payload[1][i][0] == 5: + # if payload[1][i][1] == b'\x00\x00\x00A': # user + # payload[1][i][1] = b'\x00\x00\x00E' + # elif payload[1][i][1] == b'\x00\x00\x00E': # root + # payload[1][i][1] = b'\x00\x00\x00A' + # else: + # print("Unknown field value: ", payload[1][i][1]) if payload[1][i][0] == 1: pass - #payload[1][i] = (None, None) - #payload[1][i] = (1, b64decode("D3MtN3e18QE8rve3n92wp+CwK7u/bWk/5WjQUOBN640=")) + # payload[1][i] = (None, None) + # payload[1][i] = (1, b64decode("D3MtN3e18QE8rve3n92wp+CwK7u/bWk/5WjQUOBN640=")) out = apns._serialize_payload(payload[0], payload[1]) - #return out - elif id == 0xc: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Keep Alive{bcolors.ENDC}") - elif id == 0xd: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Keep Alive Ack{bcolors.ENDC}") + # return out + elif id == 0xC: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Keep Alive{bcolors.ENDC}" + ) + elif id == 0xD: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Keep Alive Ack{bcolors.ENDC}" + ) elif id == 0x14: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Set State{bcolors.ENDC}: {_get_field(payload[1], 1).hex()}") - elif id == 0x1d or id == 0x20: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}PubSub ??{bcolors.ENDC}") - elif id == 0xe: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}Token Confirmation{bcolors.ENDC}") - elif id == 0xa: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Set State{bcolors.ENDC}: {_get_field(payload[1], 1).hex()}" + ) + elif id == 0x1D or id == 0x20: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}PubSub ??{bcolors.ENDC}" + ) + elif id == 0xE: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}Token Confirmation{bcolors.ENDC}" + ) + elif id == 0xA: topic = "" - #topic = _lookup_topic(_get_field(payload[1], 1)) + # topic = _lookup_topic(_get_field(payload[1], 1)) # if it has apsd -> APNs in the prefix, it's an outgoing notification if "apsd -> APNs" in prefix: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKBLUE}OUTGOING Notification{bcolors.ENDC}", end="") + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKBLUE}OUTGOING Notification{bcolors.ENDC}", + end="", + ) topic = _lookup_topic(_get_field(payload[1], 1)) # topic = _lookup_topic(_get_field(payload[1], 1)) # if b"bplist" in _get_field(payload[1], 3): @@ -169,54 +204,68 @@ def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) - # for key in plist: # print(f" {bcolors.OKBLUE}{key}{bcolors.ENDC}: {plist[key]}", end="") - - else: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification{bcolors.ENDC}", end="") + + else: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification{bcolors.ENDC}", + end="", + ) topic = _lookup_topic(_get_field(payload[1], 2)) - #if b"bplist" in _get_field(payload[1], 3): + # if b"bplist" in _get_field(payload[1], 3): # print(f" {bcolors.OKBLUE}Binary{bcolors.ENDC}", end="") - #print(f" {bcolors.WARNING}Topic{bcolors.ENDC}: {_lookup_topic(_get_field(payload[1], 2))}") - + # print(f" {bcolors.WARNING}Topic{bcolors.ENDC}: {_lookup_topic(_get_field(payload[1], 2))}") + print(f" {bcolors.WARNING}Topic{bcolors.ENDC}: {topic}", end="") if topic == "com.apple.madrid": print(f" {bcolors.FAIL}Madrid{bcolors.ENDC}", end="") payload = plistlib.loads(_get_field(payload[1], 3)) - #print(payload) + # print(payload) if "cT" in payload: # It's HTTP over APNs if "hs" in payload: - print(f" {bcolors.WARNING}HTTP Response{bcolors.ENDC}: {payload['hs']}", end="") + print( + f" {bcolors.WARNING}HTTP Response{bcolors.ENDC}: {payload['hs']}", + end="", + ) else: print(f" {bcolors.WARNING}HTTP Request{bcolors.ENDC}", end="") - #print(f" {bcolors.WARNING}HTTP{bcolors.ENDC} {payload['hs']}", end="") + # print(f" {bcolors.WARNING}HTTP{bcolors.ENDC} {payload['hs']}", end="") if "u" in payload: print(f" {bcolors.OKCYAN}URL{bcolors.ENDC}: {payload['u']}", end="") - print(f" {bcolors.FAIL}Content Type{bcolors.ENDC}: {payload['cT']}", end="") + print( + f" {bcolors.FAIL}Content Type{bcolors.ENDC}: {payload['cT']}", + end="", + ) if "h" in payload: - print(f" {bcolors.FAIL}Headers{bcolors.ENDC}: {payload['h']}", end="") + print( + f" {bcolors.FAIL}Headers{bcolors.ENDC}: {payload['h']}", end="" + ) if "b" in payload: # What am I really supposed to put in WBITS? Got this from a random SO answer - #print(payload["b"]) - body = zlib.decompress(payload['b'], 16+zlib.MAX_WBITS) - if b'plist' in body: + # print(payload["b"]) + body = zlib.decompress(payload["b"], 16 + zlib.MAX_WBITS) + if b"plist" in body: body = plistlib.loads(body) print(f" {bcolors.FAIL}Body{bcolors.ENDC}: {body}", end="") - - print() - - #print(f" {bcolors.WARNING}{bcolors.ENDC}: {payload['cT']}") - #for field in payload[1]: + print() + + # print(f" {bcolors.WARNING}{bcolors.ENDC}: {payload['cT']}") + + # for field in payload[1]: # print(f"Field ID: {field[0]}") # print(f"Field Value: {field[1]}") - elif id == 0xb: - print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification Ack{bcolors.ENDC} {bcolors.OKBLUE}{_get_field(payload[1], 8).hex()}{bcolors.ENDC}") + elif id == 0xB: + print( + f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification Ack{bcolors.ENDC} {bcolors.OKBLUE}{_get_field(payload[1], 8).hex()}{bcolors.ENDC}" + ) else: print(prefix, f"Payload ID: {hex(payload[0])}") for field in payload[1]: print(f"Field ID: {field[0]}") print(f"Field Value: {field[1]}") + if __name__ == "__main__": - print(f"{bcolors.OKGREEN}Enabled:{bcolors.ENDC}") \ No newline at end of file + print(f"{bcolors.OKGREEN}Enabled:{bcolors.ENDC}") diff --git a/proxy/hosts.py b/proxy/hosts.py index 1d7985e..c998dec 100644 --- a/proxy/hosts.py +++ b/proxy/hosts.py @@ -2,4 +2,4 @@ for i in range(1, 50): print(f"127.0.0.1 {i}-courier.push.apple.com") - print(f"127.0.0.1 {i}.courier.push.apple.com") \ No newline at end of file + print(f"127.0.0.1 {i}.courier.push.apple.com") diff --git a/proxy/proxy.py b/proxy/proxy.py index af4c37b..9791475 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -1,46 +1,49 @@ # TLS server to proxy APNs traffic import socket -import tlslite +import sys import threading -import sys - +import tlslite + # setting path -sys.path.append('../') +sys.path.append("../") # APNs server to proxy traffic to APNS_HOST = "windows.courier.push.apple.com" APNS_PORT = 5223 ALPN = b"apns-security-v3" -#ALPN = b"apns-security-v2" -#ALPN = b"apns-pack-v1" +# ALPN = b"apns-security-v2" +# ALPN = b"apns-pack-v1" global_cnt = 0 + # Connect to the APNs server def connect() -> tlslite.TLSConnection: # Connect to the APNs server sock = socket.create_connection((APNS_HOST, APNS_PORT)) # Wrap the socket in TLS ssock = tlslite.TLSConnection(sock) - #print("Handshaking with APNs") + # print("Handshaking with APNs") # Handshake with the server if ALPN == b"apns-security-v3": print("Using v3") ssock.handshakeClientCert(alpn=[ALPN]) else: import albert + private_key, cert = albert.generate_push_cert() cert = tlslite.X509CertChain([tlslite.X509().parse(cert)]) private_key = tlslite.parsePEMKey(private_key, private=True) # Handshake with the server ssock.handshakeClientCert(cert, private_key, alpn=[ALPN]) - + return ssock -cert:str = None -key:str = None + +cert: str = None +key: str = None import apns @@ -48,27 +51,32 @@ import printer outgoing_list = [] incoming_list = [] -#last_outgoing = b"" +# last_outgoing = b"" + def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: str = ""): try: while True: # Read data from the first connection data = conn1.read() - #print(prefix, "data: ", data) + # print(prefix, "data: ", data) # If there is no data, the connection has closed if not data: print(prefix, "Connection closed due to no data") break try: - override = printer.pretty_print_payload(prefix, apns._deserialize_payload_from_buffer(data)) + override = printer.pretty_print_payload( + prefix, apns._deserialize_payload_from_buffer(data) + ) except Exception as e: - print(e) # Can't crash the proxy over parsing errors + print(e) # Can't crash the proxy over parsing errors if override is not None: data = override print("OVERRIDE: ", end="") - printer.pretty_print_payload(prefix, apns._deserialize_payload_from_buffer(data)) + printer.pretty_print_payload( + prefix, apns._deserialize_payload_from_buffer(data) + ) if "apsd -> APNs" in prefix: global outgoing_list @@ -81,13 +89,13 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st if len(incoming_list) > 100: incoming_list.pop() - #print(prefix, data) + # print(prefix, data) # Write the data to the second connection conn2.write(data) except OSError as e: if e.errno == 9: print(prefix, "Connection closed due to OSError 9") - pass # Probably a connection closed error + pass # Probably a connection closed error else: raise e except tlslite.TLSAbruptCloseError as e: @@ -97,7 +105,10 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st conn1.close() conn2.close() + repl_lock = False + + def repl(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection): global repl_lock if repl_lock: @@ -105,37 +116,43 @@ def repl(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection): return repl_lock = True import IPython + IPython.embed() + def handle(conn: socket.socket): # Wrap the socket in TLS s_conn = tlslite.TLSConnection(conn) global cert, key chain = tlslite.X509CertChain() chain.parsePemList(cert) - #print(chain) - #cert = tlslite.X509CertChain([tlslite.X509().parse(cert)]) - key_parsed = tlslite.parsePEMKey(key, private=True) - #print(key_parsed) - s_conn.handshakeServer(certChain=chain, privateKey=key_parsed, reqCert=False, alpn=[ALPN]) + # print(chain) + # cert = tlslite.X509CertChain([tlslite.X509().parse(cert)]) + key_parsed = tlslite.parsePEMKey(key, private=True) + # print(key_parsed) + s_conn.handshakeServer( + certChain=chain, privateKey=key_parsed, reqCert=False, alpn=[ALPN] + ) print("Handling connection") # Connect to the APNs server apns = connect() print("Connected to APNs") - threading.Thread(target=repl, args=(s_conn,apns)).start() + threading.Thread(target=repl, args=(s_conn, apns)).start() global global_cnt global_cnt += 1 # Proxy data between the connections # Create a thread to proxy data from the APNs server to the client - threading.Thread(target=proxy, args=(s_conn, apns, f"{global_cnt} apsd -> APNs")).start() + threading.Thread( + target=proxy, args=(s_conn, apns, f"{global_cnt} apsd -> APNs") + ).start() # Just proxy data from the client to the APNs server in this thread proxy(apns, s_conn, f"{global_cnt} APNs -> apsd") -def serve(): +def serve(): # Create a socket to listen for connections sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Allow the socket to be reused @@ -150,7 +167,6 @@ def serve(): with open("push_certificate_chain.pem", "r") as f: global cert cert = f.read() - # NEED TO USE OPENSSL, SEE CORETRUST CMD, MIMIC ENTRUST? OR AT LEAST SEE PUSHPROXY FOR EXTRACTION & REPLACEMENT with open("push_key.pem", "r") as f: @@ -165,7 +181,7 @@ def serve(): conn, addr = sock.accept() conns.append(conn) # Create a thread to handle the connection - #handle(conn) + # handle(conn) thread = threading.Thread(target=handle, args=(conn,)) thread.start() except KeyboardInterrupt: @@ -174,5 +190,6 @@ def serve(): conn.close() sock.close() + if __name__ == "__main__": serve()