mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-23 19:32:29 -06:00
rework demo
This commit is contained in:
parent
efa7fa6d99
commit
3a946a152e
3 changed files with 37 additions and 20 deletions
29
apns.py
29
apns.py
|
@ -82,6 +82,31 @@ class Payload:
|
|||
def __str__(self) -> str:
|
||||
return f"{COMMANDS[self.command]}: {self.fields}"
|
||||
|
||||
import courier
|
||||
|
||||
class APNSConnection():
|
||||
def __init__(self, token: bytes=None, private_key=None, cert=None):
|
||||
self.sock, self.private_key, self.cert = courier.connect(private_key, cert)
|
||||
self.token = token
|
||||
|
||||
self._connect()
|
||||
|
||||
def _connect(self):
|
||||
if self.token is None:
|
||||
payload = Payload(7, Fields({2: 0x01.to_bytes()}))
|
||||
else:
|
||||
payload = Payload(7, Fields({1: self.token, 2: 0x01.to_bytes()}))
|
||||
|
||||
self.sock.write(payload.to_bytes())
|
||||
|
||||
resp = Payload.from_stream(self.sock)
|
||||
|
||||
if resp.command != 8 or resp.fields.fields[1] != 0x00.to_bytes():
|
||||
raise Exception("Failed to connect")
|
||||
|
||||
if 3 in resp.fields.fields:
|
||||
self.token = resp.fields.fields[3]
|
||||
|
||||
if __name__ == "__main__":
|
||||
import courier
|
||||
import base64
|
||||
|
@ -110,4 +135,6 @@ if __name__ == "__main__":
|
|||
# If there's a new token, save it
|
||||
if 3 in resp.fields.fields:
|
||||
with open("token", "wb") as f:
|
||||
f.write(base64.b64encode(resp.fields.fields[3]))
|
||||
f.write(base64.b64encode(resp.fields.fields[3]))
|
||||
|
||||
# Send the push topics request
|
||||
|
|
23
courier.py
23
courier.py
|
@ -7,25 +7,10 @@ COURIER_PORT = 5223
|
|||
#ALPN = [b"apns-security-v2"]
|
||||
ALPN = None
|
||||
|
||||
# Check if we have already generated a push certificate
|
||||
# If not, generate one
|
||||
def _setup_push_cert():
|
||||
try:
|
||||
with open("push.key", "r") as f:
|
||||
private_key = f.read()
|
||||
with open("push.crt", "r") as f:
|
||||
cert = f.read()
|
||||
except FileNotFoundError:
|
||||
def connect(private_key=None, cert=None):
|
||||
# If we don't have a private key or certificate, generate one
|
||||
if private_key is None or cert is None:
|
||||
private_key, cert = albert.generate_push_cert()
|
||||
with open("push.key", "w") as f:
|
||||
f.write(private_key)
|
||||
with open("push.crt", "w") as f:
|
||||
f.write(cert)
|
||||
|
||||
return private_key, cert
|
||||
|
||||
def connect():
|
||||
private_key, cert = _setup_push_cert()
|
||||
|
||||
# Connect to the courier server
|
||||
sock = socket.create_connection((COURIER_HOST, COURIER_PORT))
|
||||
|
@ -37,7 +22,7 @@ def connect():
|
|||
# Handshake with the server
|
||||
sock.handshakeClientCert(cert, private_key, alpn=ALPN)
|
||||
|
||||
return sock
|
||||
return sock, private_key, cert
|
||||
|
||||
if __name__ == "__main__":
|
||||
sock = connect()
|
||||
|
|
5
demo.py
Normal file
5
demo.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
import apns
|
||||
from base64 import b64decode, b64encode
|
||||
|
||||
c = apns.APNSConnection()
|
||||
print(f"Push Token: {b64encode(c.token).decode()}")
|
Loading…
Reference in a new issue