mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-24 11:52:27 -06:00
y not work...
This commit is contained in:
parent
5a82ce29e4
commit
023b827837
5 changed files with 83 additions and 29 deletions
51
apns.py
51
apns.py
File diff suppressed because one or more lines are too long
|
@ -1,7 +1,7 @@
|
|||
import tlslite
|
||||
import socket
|
||||
|
||||
COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config
|
||||
COURIER_HOST = "1-courier.push.apple.com" # TODO: Get this from config
|
||||
COURIER_PORT = 5223
|
||||
ALPN = [b"apns-security-v2"]
|
||||
|
||||
|
|
12
demo.py
12
demo.py
|
@ -5,16 +5,26 @@ from hashlib import sha1
|
|||
conn1 = apns.APNSConnection()
|
||||
conn1.connect()
|
||||
print(f"Push Token 1: {b64encode(conn1.token).decode()}")
|
||||
conn1.filter([])
|
||||
conn1.connect(False)
|
||||
print(f"User Token 1: {b64encode(conn1.token).decode()}")
|
||||
|
||||
conn2 = apns.APNSConnection()
|
||||
conn2.connect()
|
||||
conn2.filter([])
|
||||
print(f"Push Token 2: {b64encode(conn2.token).decode()}")
|
||||
conn2.connect(False)
|
||||
print(f"User Token 2: {b64encode(conn2.token).decode()}")
|
||||
|
||||
conn1.filter(["com.apple.madrid"])
|
||||
conn2.filter(["com.apple.madrid"])
|
||||
|
||||
conn1.send_message(conn2.token, "com.apple.madrid", "\{'hello': 'world'\}")
|
||||
conn1.send_message(b"\xe5^\xc0c\xe8\xa4\x1e\xbe\x03\x89'\xea\xd5m\x94\x05\xae\xf5\x1bqK\x1aJTH\xa4\xeb8\xb8<\xd7)", "com.apple.madrid", b'bplist00\xdd\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x13\x17\x18\x19ScdrRtPRgdQiRsPRnrQcQUQtScdvRuaRqHQvM\x08\xd6\xf3\xe6\x8d\x04\x18\x95\xfd\xea\xe9\xf50_\x10\x10tel:+16106632676\t\x12=\x12c&_\x10\x1amailto:jjgill07@icloud.com\x10\x01\x10mO\x10\x10UC>\x9f\xce\xa4N\xe0\xba\xe9\xad\x8e_h\xd7hO\x10 \xe5^\xc0c\xe8\xa4\x1e\xbe\x03\x89\'\xea\xd5m\x94\x05\xae\xf5\x1bqK\x1aJTH\xa4\xeb8\xb8<\xd7)_\x10#[macOS,13.2.1,22D68,MacBookPro18,3]O\x10!\x01\x97\xca\\"\xcaI\x82\x0c\xb66C\xa7\x89h\x91\xcd\x18Ozj"\x06u;9\x96\xebrQs|=\x10\x08\x00\x08\x00#\x00\'\x00*\x00-\x00/\x002\x005\x007\x009\x00;\x00?\x00B\x00E\x00G\x00U\x00h\x00i\x00n\x00\x8b\x00\x8d\x00\x8f\x00\xa2\x00\xc5\x00\xeb\x01\x0f\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x11')
|
||||
|
||||
|
||||
while True:
|
||||
print(conn1.expect_message())
|
||||
#print(conn2.expect_message())
|
||||
#print(conn1.expect_message())
|
||||
#print(conn2.expect_message())
|
||||
# #print(sha1(b"com.apple.madrid").digest())
|
||||
|
|
10
printer.py
10
printer.py
|
@ -146,14 +146,18 @@ def pretty_print_payload(prefix, payload: tuple[int, list[tuple[int, bytes]]]) -
|
|||
elif id == 0x14:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Set State{bcolors.ENDC}: {_get_field(payload[1], 1).hex()}")
|
||||
elif id == 0x1d or id == 0x20:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}PubSub{bcolors.ENDC}")
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}PubSub ??{bcolors.ENDC}")
|
||||
elif id == 0xe:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.FAIL}Unknown 0xe{bcolors.ENDC}")
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.WARNING}Token Confirmation{bcolors.ENDC}")
|
||||
elif id == 0xa:
|
||||
if prefix == "apsd -> APNs":
|
||||
# if it has apsd -> APNs in the prefix, it's an outgoing notification
|
||||
if "apsd -> APNs" in prefix:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKBLUE}OUTGOING Notification{bcolors.ENDC}")
|
||||
else:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification{bcolors.ENDC}")
|
||||
for field in payload[1]:
|
||||
print(f"Field ID: {field[0]}")
|
||||
print(f"Field Value: {field[1]}")
|
||||
elif id == 0xb:
|
||||
print(f"{bcolors.OKGREEN}{prefix}{bcolors.ENDC}: {bcolors.OKCYAN}Notification Ack{bcolors.ENDC} {bcolors.OKBLUE}{_get_field(payload[1], 8).hex()}{bcolors.ENDC}")
|
||||
else:
|
||||
|
|
|
@ -4,12 +4,20 @@ import socket
|
|||
import tlslite
|
||||
import threading
|
||||
|
||||
import sys
|
||||
|
||||
# setting path
|
||||
sys.path.append('../')
|
||||
|
||||
# APNs server to proxy traffic to
|
||||
APNS_HOST = "windows.courier.push.apple.com"
|
||||
APNS_PORT = 5223
|
||||
ALPN = b"apns-security-v3"
|
||||
#ALPN = b"apns-security-v3"
|
||||
ALPN = b"apns-security-v2"
|
||||
#ALPN = b"apns-pack-v1"
|
||||
|
||||
global_cnt = 0
|
||||
|
||||
# Connect to the APNs server
|
||||
def connect() -> tlslite.TLSConnection:
|
||||
# Connect to the APNs server
|
||||
|
@ -18,8 +26,15 @@ def connect() -> tlslite.TLSConnection:
|
|||
ssock = tlslite.TLSConnection(sock)
|
||||
#print("Handshaking with APNs")
|
||||
# Handshake with the server
|
||||
ssock.handshakeClientCert(alpn=[b"apns-security-v3"])
|
||||
#print("Handshaked with APNs")
|
||||
if ALPN == b"apns-security-v3":
|
||||
ssock.handshakeClientCert(alpn=[ALPN])
|
||||
else:
|
||||
import albert
|
||||
private_key, cert = albert.generate_push_cert()
|
||||
cert = tlslite.X509CertChain([tlslite.X509().parse(cert)])
|
||||
private_key = tlslite.parsePEMKey(private_key, private=True)
|
||||
# Handshake with the server
|
||||
ssock.handshakeClientCert(cert, private_key, alpn=[ALPN])
|
||||
|
||||
return ssock
|
||||
|
||||
|
@ -27,11 +42,6 @@ cert:str = None
|
|||
key:str = None
|
||||
|
||||
|
||||
import sys
|
||||
|
||||
# setting path
|
||||
sys.path.append('../')
|
||||
|
||||
import apns
|
||||
import printer
|
||||
|
||||
|
@ -80,11 +90,14 @@ def handle(conn: socket.socket):
|
|||
# Connect to the APNs server
|
||||
apns = connect()
|
||||
print("Connected to APNs")
|
||||
|
||||
global global_cnt
|
||||
global_cnt += 1
|
||||
# Proxy data between the connections
|
||||
# Create a thread to proxy data from the APNs server to the client
|
||||
threading.Thread(target=proxy, args=(s_conn, apns, "apsd -> APNs")).start()
|
||||
threading.Thread(target=proxy, args=(s_conn, apns, f"{global_cnt} apsd -> APNs")).start()
|
||||
# Just proxy data from the client to the APNs server in this thread
|
||||
proxy(apns, s_conn, "APNs -> apsd")
|
||||
proxy(apns, s_conn, f"{global_cnt} APNs -> apsd")
|
||||
|
||||
def serve():
|
||||
|
||||
|
|
Loading…
Reference in a new issue