pypush-plus-plus/proxy/proxy.py

125 lines
4.2 KiB
Python
Raw Normal View History

2023-08-19 14:25:21 -05:00
import os
2023-08-17 23:18:30 -05:00
import sys
import traceback
2023-08-17 23:18:30 -05:00
2023-08-19 14:25:21 -05:00
# setting path so we can import the needed packages
sys.path.append(os.path.join(sys.path[0], "../"))
sys.path.append(os.path.join(sys.path[0], "../../"))
2023-08-17 23:18:30 -05:00
2023-08-19 14:25:21 -05:00
import gzip
2023-08-17 23:18:30 -05:00
import logging
2023-08-18 19:17:29 -05:00
import plistlib
2023-08-19 14:25:21 -05:00
import ssl
from hashlib import sha1
2023-08-18 19:17:29 -05:00
2023-08-19 14:25:21 -05:00
import trio
from rich.logging import RichHandler
import printer
from pypush import apns
2023-08-17 23:18:30 -05:00
logging.basicConfig(
level=logging.NOTSET,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
async def main():
2023-08-19 10:27:44 -05:00
apns.COURIER_HOST = "windows.courier.push.apple.com" # Use windows courier so that /etc/hosts override doesn't affect it
2023-08-17 23:18:30 -05:00
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.set_alpn_protocols(["apns-security-v3"])
# Set the certificate and private key
2023-08-19 14:25:21 -05:00
parent_dir: str = os.path.dirname(os.path.realpath(__file__))
context.load_cert_chain(os.path.join(parent_dir, "push_certificate_chain.pem"), os.path.join(parent_dir, "push_key.pem"))
2023-08-17 23:18:30 -05:00
await trio.serve_ssl_over_tcp(handle_proxy, 5223, context)
async def handle_proxy(stream: trio.SocketStream):
try:
2023-08-18 19:17:29 -05:00
p = APNSProxy(stream)
await p.start()
except Exception:
logging.error(f"APNSProxy instance encountered exception:")
traceback.print_exc()
2023-08-18 19:17:29 -05:00
#raise e
class APNSProxy:
def __init__(self, client: trio.SocketStream):
self.client = client
async def start(self):
2023-08-19 14:25:21 -05:00
logging.info("Starting proxy...")
2023-08-17 23:18:30 -05:00
async with trio.open_nursery() as nursery:
2023-08-19 14:25:21 -05:00
while True:
try:
apns_server = apns.APNSConnection(nursery)
await apns_server._connect_socket()
self.connection = apns_server
2023-08-19 14:25:21 -05:00
nursery.start_soon(self.proxy, True)
nursery.start_soon(self.proxy, False)
break # Will only happen if there is no exception
except Exception:
logging.error("Unable to start proxy, trying again...")
await trio.sleep(1)
2023-08-17 23:18:30 -05:00
2023-08-18 19:17:29 -05:00
async def proxy(self, to_server: bool):
if to_server:
from_stream = self.client
to_stream = self.connection.sock
2023-08-18 19:17:29 -05:00
else:
from_stream = self.connection.sock
2023-08-18 19:17:29 -05:00
to_stream = self.client
2023-08-18 19:17:29 -05:00
while True:
payload = await apns.APNSPayload.read_from_stream(from_stream)
payload = self.tamper(payload, to_server)
self.log(payload, to_server)
await payload.write_to_stream(to_stream)
def log(self, payload: apns.APNSPayload, to_server: bool):
2023-08-19 13:53:17 -05:00
printer.print_payload(payload, to_server)
# if to_server:
# logging.info(f"-> {payload}")
# else:
# logging.info(f"<- {payload}")
2023-08-18 19:17:29 -05:00
def tamper(self, payload: apns.APNSPayload, to_server) -> apns.APNSPayload:
2023-08-19 10:27:44 -05:00
#if not to_server:
# payload = self.tamper_lookup_keys(payload)
2023-08-17 23:18:30 -05:00
2023-08-18 19:17:29 -05:00
return payload
2023-08-17 23:18:30 -05:00
2023-08-18 19:17:29 -05:00
def tamper_lookup_keys(self, payload: apns.APNSPayload) -> apns.APNSPayload:
if payload.id == 0xA: # Notification
if payload.fields_with_id(2)[0].value == sha1(b"com.apple.madrid").digest(): # Topic
if (body := payload.fields_with_id(3)[0].value) is not None:
2023-08-18 19:17:29 -05:00
body = plistlib.loads(body)
if body['c'] == 97: # Lookup response
resp = gzip.decompress(body["b"]) # HTTP body
resp = plistlib.loads(resp)
2023-08-17 23:18:30 -05:00
2023-08-18 19:17:29 -05:00
# Replace public keys
for result in resp["results"].values():
for identity in result["identities"]:
if "client-data" in identity:
identity["client-data"]["public-message-identity-key"] = b"REDACTED"
2023-08-18 19:17:29 -05:00
resp = gzip.compress(plistlib.dumps(resp, fmt=plistlib.FMT_BINARY), mtime=0)
body["b"] = resp
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
for field in payload.fields:
if field.id == 3:
field.value = body
2023-08-18 19:17:29 -05:00
break
return payload
2023-08-17 23:18:30 -05:00
if __name__ == "__main__":
trio.run(main)