mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
Adding registration notification option (#37)
This commit is contained in:
parent
0d5f3d91f6
commit
d15716a7da
1 changed files with 36 additions and 15 deletions
51
demo.py
51
demo.py
|
@ -5,6 +5,7 @@ import threading
|
|||
import time
|
||||
from base64 import b64decode, b64encode
|
||||
from getpass import getpass
|
||||
from cryptography import x509
|
||||
|
||||
from rich.logging import RichHandler
|
||||
|
||||
|
@ -50,6 +51,13 @@ def safe_config():
|
|||
with open("config.json", "w") as f:
|
||||
json.dump(CONFIG, f, indent=4)
|
||||
|
||||
def get_not_valid_after_timestamp(cert_data):
|
||||
try:
|
||||
cert = x509.load_pem_x509_certificate(cert_data.encode('utf-8'))
|
||||
return cert.not_valid_after
|
||||
except Exception as e:
|
||||
return None # Return None in case of an error
|
||||
|
||||
async def main(args: argparse.Namespace):
|
||||
# Load any existing push credentials
|
||||
token = CONFIG.get("push", {}).get("token")
|
||||
|
@ -62,7 +70,7 @@ async def main(args: argparse.Namespace):
|
|||
import emulated.nac
|
||||
vd = emulated.nac.generate_validation_data()
|
||||
vd = b64encode(vd).decode()
|
||||
users = ids.register(conn, users, vd, args.client_data)
|
||||
users = ids.register(conn, users, vd, args.client_data or args.reg_notify)
|
||||
return users
|
||||
|
||||
async with apns.APNSConnection.start(push_creds) as conn:
|
||||
|
@ -146,6 +154,32 @@ async def main(args: argparse.Namespace):
|
|||
print("Re-registering...")
|
||||
users = register(conn, users)
|
||||
|
||||
CONFIG["users"] = []
|
||||
for user in users:
|
||||
CONFIG["users"].append({
|
||||
"id": user.user_id,
|
||||
"auth_key": user.auth_keypair.key,
|
||||
"auth_cert": user.auth_keypair.cert,
|
||||
"encryption_key": user.encryption_identity.encryption_key if user.encryption_identity is not None else None,
|
||||
"signing_key": user.encryption_identity.signing_key if user.encryption_identity is not None else None,
|
||||
"id_cert": user.id_cert,
|
||||
"handles": user.handles,
|
||||
})
|
||||
if "P:" in str(user.user_id):
|
||||
expiration = get_not_valid_after_timestamp(user.id_cert)
|
||||
expiration = str(expiration) + " UTC"
|
||||
print("Number registration is valid until "+ expiration)
|
||||
else:
|
||||
email_user = user
|
||||
for n in range(len(user.handles)):
|
||||
if "mailto:" in str(user.handles[n]):
|
||||
email_addr = user.handles[n]
|
||||
|
||||
safe_config()
|
||||
if args.reg_notify:
|
||||
im = imessage.iMessageUser(conn, email_user)
|
||||
await im.send(imessage.iMessage.create(im, "Number registration is valid until " + expiration, [email_addr]))
|
||||
|
||||
print(f"Done?")
|
||||
|
||||
if args.alive:
|
||||
|
@ -159,20 +193,6 @@ async def main(args: argparse.Namespace):
|
|||
# while True:
|
||||
# print(await im.receive())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# user = ids.IDSUser(conn)
|
||||
|
||||
# if CONFIG.get("auth", {}).get("cert") is not None:
|
||||
|
@ -281,6 +301,7 @@ async def output_task(im: imessage.iMessageUser):
|
|||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--reregister", action="store_true", help="Force re-registration")
|
||||
parser.add_argument("--reg-notify", action="store_true", help="Get iMessage after each registration")
|
||||
parser.add_argument("--alive", action="store_true", help="Keep the connection alive")
|
||||
parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)")
|
||||
parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ")
|
||||
|
|
Loading…
Reference in a new issue