diff --git a/printer.py b/printer.py index ad7733f..9217a53 100644 --- a/printer.py +++ b/printer.py @@ -164,12 +164,12 @@ def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) - if payload[1][i][0] == 0x4: payload[1][i] = (0x4, (int.from_bytes(payload[1][i][1]) + 1).to_bytes(4, "big")) payload3 = apns._serialize_payload(payload[0], payload[1]) - return (payload1 + payload2 + payload3) + #return (payload1 + payload2 + payload3) else: print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification{bcolors.ENDC}") - for field in payload[1]: - print(f"Field ID: {field[0]}") - print(f"Field Value: {field[1]}") + #for field in payload[1]: + # print(f"Field ID: {field[0]}") + # print(f"Field Value: {field[1]}") elif id == 0xb: print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification Ack{bcolors.ENDC} {bcolors.OKBLUE}{_get_field(payload[1], 8).hex()}{bcolors.ENDC}") else: diff --git a/proxy/proxy.py b/proxy/proxy.py index 4dc2dff..a047661 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -46,6 +46,9 @@ key:str = None import apns import printer +outgoing_list = [] +incoming_list = [] +#last_outgoing = b"" def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: str = ""): try: @@ -64,6 +67,17 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st print("OVERRIDE: ", end="") printer.pretty_print_payload(prefix, apns._deserialize_payload_from_buffer(data)) + if "apsd -> APNs" in prefix: + global outgoing_list + outgoing_list.insert(0, data) + if len(outgoing_list) > 100: + outgoing_list.pop() + elif "APNs -> apsd" in prefix: + global incoming_list + incoming_list.insert(0, data) + if len(incoming_list) > 100: + incoming_list.pop() + #print(prefix, data) # Write the data to the second connection conn2.write(data) @@ -80,6 +94,69 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st conn1.close() conn2.close() +def repl(conn: tlslite.TLSConnection): + while True: + i = input(">>> ") + if "ro" in i: + print("Replaying last outgoing packet") + try: + index = int(i[2:]) + except ValueError: + print("Invalid index") + continue + if index >= len(outgoing_list): + print("Invalid index") + continue + print("Replaying outgoing packet") + conn.write(outgoing_list[index]) + # Print the packet + printer.pretty_print_payload("[REPLAY] apsd -> APNs", apns._deserialize_payload_from_buffer(outgoing_list[index])) + + elif "io" in i: + try: + index = int(i[2:]) + except ValueError: + print("Invalid index") + continue + if index >= len(outgoing_list): + print("Invalid index") + continue + print("Inspecting outgoing packet") + payload = apns._deserialize_payload_from_buffer(outgoing_list[index]) + print(f"ID: {payload[0]}") + for i in range(len(payload[1])): + print(f" {payload[1][i][0]}: {payload[1][i][1]}") + + elif "ri" in i: + print("Replaying last outgoing packet") + try: + index = int(i[2:]) + except ValueError: + print("Invalid index") + continue + if index >= len(incoming_list): + print("Invalid index") + continue + print("Replaying outgoing packet") + conn.write(incoming_list[index]) + # Print the packet + printer.pretty_print_payload("[REPLAY] APNs -> apsd", apns._deserialize_payload_from_buffer(incoming_list[index])) + + elif "ii" in i: + try: + index = int(i[2:]) + except ValueError: + print("Invalid index") + continue + if index >= len(incoming_list): + print("Invalid index") + continue + print("Inspecting incoming packet") + payload = apns._deserialize_payload_from_buffer(incoming_list[index]) + print(f"ID: {payload[0]}") + for i in range(len(payload[1])): + print(f" {payload[1][i][0]}: {payload[1][i][1]}") + def handle(conn: socket.socket): # Wrap the socket in TLS s_conn = tlslite.TLSConnection(conn) @@ -97,6 +174,8 @@ def handle(conn: socket.socket): apns = connect() print("Connected to APNs") + threading.Thread(target=repl, args=(s_conn,)).start() + global global_cnt global_cnt += 1 # Proxy data between the connections