mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-23 19:32:29 -06:00
yay inspection repl
This commit is contained in:
parent
33c30ffdc5
commit
f30178a70c
2 changed files with 83 additions and 4 deletions
|
@ -164,12 +164,12 @@ def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) -
|
|||
if payload[1][i][0] == 0x4:
|
||||
payload[1][i] = (0x4, (int.from_bytes(payload[1][i][1]) + 1).to_bytes(4, "big"))
|
||||
payload3 = apns._serialize_payload(payload[0], payload[1])
|
||||
return (payload1 + payload2 + payload3)
|
||||
#return (payload1 + payload2 + payload3)
|
||||
else:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification{bcolors.ENDC}")
|
||||
for field in payload[1]:
|
||||
print(f"Field ID: {field[0]}")
|
||||
print(f"Field Value: {field[1]}")
|
||||
#for field in payload[1]:
|
||||
# print(f"Field ID: {field[0]}")
|
||||
# print(f"Field Value: {field[1]}")
|
||||
elif id == 0xb:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification Ack{bcolors.ENDC} {bcolors.OKBLUE}{_get_field(payload[1], 8).hex()}{bcolors.ENDC}")
|
||||
else:
|
||||
|
|
|
@ -46,6 +46,9 @@ key:str = None
|
|||
import apns
|
||||
import printer
|
||||
|
||||
outgoing_list = []
|
||||
incoming_list = []
|
||||
#last_outgoing = b""
|
||||
|
||||
def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: str = ""):
|
||||
try:
|
||||
|
@ -64,6 +67,17 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st
|
|||
print("OVERRIDE: ", end="")
|
||||
printer.pretty_print_payload(prefix, apns._deserialize_payload_from_buffer(data))
|
||||
|
||||
if "apsd -> APNs" in prefix:
|
||||
global outgoing_list
|
||||
outgoing_list.insert(0, data)
|
||||
if len(outgoing_list) > 100:
|
||||
outgoing_list.pop()
|
||||
elif "APNs -> apsd" in prefix:
|
||||
global incoming_list
|
||||
incoming_list.insert(0, data)
|
||||
if len(incoming_list) > 100:
|
||||
incoming_list.pop()
|
||||
|
||||
#print(prefix, data)
|
||||
# Write the data to the second connection
|
||||
conn2.write(data)
|
||||
|
@ -80,6 +94,69 @@ def proxy(conn1: tlslite.TLSConnection, conn2: tlslite.TLSConnection, prefix: st
|
|||
conn1.close()
|
||||
conn2.close()
|
||||
|
||||
def repl(conn: tlslite.TLSConnection):
|
||||
while True:
|
||||
i = input(">>> ")
|
||||
if "ro" in i:
|
||||
print("Replaying last outgoing packet")
|
||||
try:
|
||||
index = int(i[2:])
|
||||
except ValueError:
|
||||
print("Invalid index")
|
||||
continue
|
||||
if index >= len(outgoing_list):
|
||||
print("Invalid index")
|
||||
continue
|
||||
print("Replaying outgoing packet")
|
||||
conn.write(outgoing_list[index])
|
||||
# Print the packet
|
||||
printer.pretty_print_payload("[REPLAY] apsd -> APNs", apns._deserialize_payload_from_buffer(outgoing_list[index]))
|
||||
|
||||
elif "io" in i:
|
||||
try:
|
||||
index = int(i[2:])
|
||||
except ValueError:
|
||||
print("Invalid index")
|
||||
continue
|
||||
if index >= len(outgoing_list):
|
||||
print("Invalid index")
|
||||
continue
|
||||
print("Inspecting outgoing packet")
|
||||
payload = apns._deserialize_payload_from_buffer(outgoing_list[index])
|
||||
print(f"ID: {payload[0]}")
|
||||
for i in range(len(payload[1])):
|
||||
print(f" {payload[1][i][0]}: {payload[1][i][1]}")
|
||||
|
||||
elif "ri" in i:
|
||||
print("Replaying last outgoing packet")
|
||||
try:
|
||||
index = int(i[2:])
|
||||
except ValueError:
|
||||
print("Invalid index")
|
||||
continue
|
||||
if index >= len(incoming_list):
|
||||
print("Invalid index")
|
||||
continue
|
||||
print("Replaying outgoing packet")
|
||||
conn.write(incoming_list[index])
|
||||
# Print the packet
|
||||
printer.pretty_print_payload("[REPLAY] APNs -> apsd", apns._deserialize_payload_from_buffer(incoming_list[index]))
|
||||
|
||||
elif "ii" in i:
|
||||
try:
|
||||
index = int(i[2:])
|
||||
except ValueError:
|
||||
print("Invalid index")
|
||||
continue
|
||||
if index >= len(incoming_list):
|
||||
print("Invalid index")
|
||||
continue
|
||||
print("Inspecting incoming packet")
|
||||
payload = apns._deserialize_payload_from_buffer(incoming_list[index])
|
||||
print(f"ID: {payload[0]}")
|
||||
for i in range(len(payload[1])):
|
||||
print(f" {payload[1][i][0]}: {payload[1][i][1]}")
|
||||
|
||||
def handle(conn: socket.socket):
|
||||
# Wrap the socket in TLS
|
||||
s_conn = tlslite.TLSConnection(conn)
|
||||
|
@ -97,6 +174,8 @@ def handle(conn: socket.socket):
|
|||
apns = connect()
|
||||
print("Connected to APNs")
|
||||
|
||||
threading.Thread(target=repl, args=(s_conn,)).start()
|
||||
|
||||
global global_cnt
|
||||
global_cnt += 1
|
||||
# Proxy data between the connections
|
||||
|
|
Loading…
Reference in a new issue