From bad5b760b3d693a1a9ba989a9f0acdaa6945770b Mon Sep 17 00:00:00 2001 From: JJTech0130 Date: Sat, 2 Sep 2023 19:09:41 -0400 Subject: [PATCH] improvements to args --- demo.py | 16 +++++- sms_registration.py | 116 ++++++++++++++++++++++++++------------------ 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/demo.py b/demo.py index 943d708..074c4ae 100644 --- a/demo.py +++ b/demo.py @@ -98,12 +98,20 @@ async def main(args: argparse.Namespace): else: print("Would you like to register a phone number? (y/n)") if input("> ").lower() == "y": + import sms_registration + if args.gateway is not None: + sms_registration.GATEWAY = args.gateway + if args.phone is not None: + sms_registration.PHONE_IP = args.phone + if "phone" in CONFIG: phone_sig = b64decode(CONFIG["phone"].get("sig")) phone_number = CONFIG["phone"].get("number") + elif args.pdu is not None: + sms_registration.parse_pdu(args.pdu, None) else: import sms_registration - phone_number, phone_sig = sms_registration.register(conn.credentials.token) + phone_number, phone_sig = sms_registration.register(conn.credentials.token, args.trigger_pdu) CONFIG["phone"] = { "number": phone_number, "sig": b64encode(phone_sig).decode(), @@ -266,5 +274,11 @@ if __name__ == "__main__": parser.add_argument("--reregister", action="store_true", help="Force re-registration") parser.add_argument("--alive", action="store_true", help="Keep the connection alive") parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)") + parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ") + # String arg to override pdu + parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP") + parser.add_argument("--phone", type=str, help="Override the phone IP") + parser.add_argument("--gateway", type=str, help="Override the gateway phone number") + args = parser.parse_args() trio.run(main, args) \ No newline at end of file diff --git a/sms_registration.py b/sms_registration.py index aef0e82..fb771b9 100644 --- a/sms_registration.py +++ b/sms_registration.py @@ -9,14 +9,34 @@ urllib3.disable_warnings() PHONE_IP = "192.168.5.120" API_PORT = 8080 +GATEWAY = "22223333" -def register(push_token: bytes, gateway = "22223333") -> tuple[str, bytes]: +def register(push_token: bytes, no_parse = False, gateway = GATEWAY) -> tuple[str, bytes]: """Forwards a registration request to the phone and returns the phone number, signature for the provided push token""" token = push_token.hex().upper() req_id = random.randint(0, 2**32) sms = f"REG-REQ?v=3;t={token};r={req_id};" r = requests.get(f"http://{PHONE_IP}:{API_PORT}/register", params={"sms": sms, "gateway": gateway}) + + if no_parse: + print("Now do the next part and rerun with --pdu") + exit() + parse_pdu(r, req_id) + # if r.text.split("?")[0] != "REG-RESP": + # raise Exception(f"Failed to register: {r.text}") + # #assert r.text.split("?")[0] == "REG-RESP" + # resp_params = r.text.split("?")[1] + # resp_params = resp_params.split(";") + # resp_params = {param.split("=")[0]: param.split("=")[1] for param in resp_params} + # assert resp_params["v"] == "3" + # assert resp_params["r"] == str(req_id) + + # signature = bytes.fromhex(resp_params["s"]) + + # return resp_params["n"], signature + +def parse_pdu(r: str, req_id: int | None = None): if r.text.split("?")[0] != "REG-RESP": raise Exception(f"Failed to register: {r.text}") #assert r.text.split("?")[0] == "REG-RESP" @@ -24,62 +44,64 @@ def register(push_token: bytes, gateway = "22223333") -> tuple[str, bytes]: resp_params = resp_params.split(";") resp_params = {param.split("=")[0]: param.split("=")[1] for param in resp_params} assert resp_params["v"] == "3" - assert resp_params["r"] == str(req_id) + if req_id is not None: + assert resp_params["r"] == str(req_id) signature = bytes.fromhex(resp_params["s"]) return resp_params["n"], signature -async def main(): - # Open test.json - try: - with open("test.json", "r") as f: - test_json = f.read() - import json - test_json = json.loads(test_json) - except FileNotFoundError: - test_json = {} + +# async def main(): +# # Open test.json +# try: +# with open("test.json", "r") as f: +# test_json = f.read() +# import json +# test_json = json.loads(test_json) +# except FileNotFoundError: +# test_json = {} - creds = apns.PushCredentials( - test_json.get("push_key", ""), - test_json.get("push_cert", ""), - b64decode(test_json["push_token"]) if "push_token" in test_json else b"", - ) +# creds = apns.PushCredentials( +# test_json.get("push_key", ""), +# test_json.get("push_cert", ""), +# b64decode(test_json["push_token"]) if "push_token" in test_json else b"", +# ) - async with apns.APNSConnection.start(creds) as connection: - connection.credentials - #number, sig = register(connection.credentials.token) - if "register_sig" not in test_json: - try: - number, sig = register(connection.credentials.token) - test_json["register_sig"] = sig.hex() - test_json["number"] = number - except Exception as e: - print(e) - sig = None - number = None - else: - sig = bytes.fromhex(test_json["register_sig"]) - number = test_json["number"] - if sig is not None and number is not None: - from ids import profile - phone_auth_keypair = profile.get_phone_cert(number, connection.credentials.token, [sig]) - test_json["auth_key"] = phone_auth_keypair.key - test_json["auth_cert"] = phone_auth_keypair.cert +# async with apns.APNSConnection.start(creds) as connection: +# connection.credentials +# #number, sig = register(connection.credentials.token) +# if "register_sig" not in test_json: +# try: +# number, sig = register(connection.credentials.token) +# test_json["register_sig"] = sig.hex() +# test_json["number"] = number +# except Exception as e: +# print(e) +# sig = None +# number = None +# else: +# sig = bytes.fromhex(test_json["register_sig"]) +# number = test_json["number"] +# if sig is not None and number is not None: +# from ids import profile +# phone_auth_keypair = profile.get_phone_cert(number, connection.credentials.token, [sig]) +# test_json["auth_key"] = phone_auth_keypair.key +# test_json["auth_cert"] = phone_auth_keypair.cert - out_json = { - "push_key": creds.private_key, - "push_cert": creds.cert, - "push_token": b64encode(creds.token).decode("utf-8"), - } - test_json.update(out_json) +# out_json = { +# "push_key": creds.private_key, +# "push_cert": creds.cert, +# "push_token": b64encode(creds.token).decode("utf-8"), +# } +# test_json.update(out_json) - with open("test.json", "w") as f: - import json - f.write(json.dumps(test_json, indent=4)) +# with open("test.json", "w") as f: +# import json +# f.write(json.dumps(test_json, indent=4)) -if __name__ == "__main__": - trio.run(main) \ No newline at end of file +# if __name__ == "__main__": +# trio.run(main) \ No newline at end of file