improvements to args

This commit is contained in:
JJTech0130 2023-09-02 19:09:41 -04:00
parent 61e6b08b66
commit bad5b760b3
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
2 changed files with 84 additions and 48 deletions

16
demo.py
View file

@ -98,12 +98,20 @@ async def main(args: argparse.Namespace):
else: else:
print("Would you like to register a phone number? (y/n)") print("Would you like to register a phone number? (y/n)")
if input("> ").lower() == "y": if input("> ").lower() == "y":
import sms_registration
if args.gateway is not None:
sms_registration.GATEWAY = args.gateway
if args.phone is not None:
sms_registration.PHONE_IP = args.phone
if "phone" in CONFIG: if "phone" in CONFIG:
phone_sig = b64decode(CONFIG["phone"].get("sig")) phone_sig = b64decode(CONFIG["phone"].get("sig"))
phone_number = CONFIG["phone"].get("number") phone_number = CONFIG["phone"].get("number")
elif args.pdu is not None:
sms_registration.parse_pdu(args.pdu, None)
else: else:
import sms_registration import sms_registration
phone_number, phone_sig = sms_registration.register(conn.credentials.token) phone_number, phone_sig = sms_registration.register(conn.credentials.token, args.trigger_pdu)
CONFIG["phone"] = { CONFIG["phone"] = {
"number": phone_number, "number": phone_number,
"sig": b64encode(phone_sig).decode(), "sig": b64encode(phone_sig).decode(),
@ -266,5 +274,11 @@ if __name__ == "__main__":
parser.add_argument("--reregister", action="store_true", help="Force re-registration") parser.add_argument("--reregister", action="store_true", help="Force re-registration")
parser.add_argument("--alive", action="store_true", help="Keep the connection alive") parser.add_argument("--alive", action="store_true", help="Keep the connection alive")
parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)") parser.add_argument("--client-data", action="store_true", help="Publish client data (only necessary for actually sending/receiving messages)")
parser.add_argument("--trigger-pdu", action="store_true", help="Trigger a REG-REQ")
# String arg to override pdu
parser.add_argument("--pdu", type=str, help="Override the PDU REG-RESP")
parser.add_argument("--phone", type=str, help="Override the phone IP")
parser.add_argument("--gateway", type=str, help="Override the gateway phone number")
args = parser.parse_args() args = parser.parse_args()
trio.run(main, args) trio.run(main, args)

View file

@ -9,14 +9,34 @@ urllib3.disable_warnings()
PHONE_IP = "192.168.5.120" PHONE_IP = "192.168.5.120"
API_PORT = 8080 API_PORT = 8080
GATEWAY = "22223333"
def register(push_token: bytes, gateway = "22223333") -> tuple[str, bytes]: def register(push_token: bytes, no_parse = False, gateway = GATEWAY) -> tuple[str, bytes]:
"""Forwards a registration request to the phone and returns the phone number, signature for the provided push token""" """Forwards a registration request to the phone and returns the phone number, signature for the provided push token"""
token = push_token.hex().upper() token = push_token.hex().upper()
req_id = random.randint(0, 2**32) req_id = random.randint(0, 2**32)
sms = f"REG-REQ?v=3;t={token};r={req_id};" sms = f"REG-REQ?v=3;t={token};r={req_id};"
r = requests.get(f"http://{PHONE_IP}:{API_PORT}/register", params={"sms": sms, "gateway": gateway}) r = requests.get(f"http://{PHONE_IP}:{API_PORT}/register", params={"sms": sms, "gateway": gateway})
if no_parse:
print("Now do the next part and rerun with --pdu")
exit()
parse_pdu(r, req_id)
# if r.text.split("?")[0] != "REG-RESP":
# raise Exception(f"Failed to register: {r.text}")
# #assert r.text.split("?")[0] == "REG-RESP"
# resp_params = r.text.split("?")[1]
# resp_params = resp_params.split(";")
# resp_params = {param.split("=")[0]: param.split("=")[1] for param in resp_params}
# assert resp_params["v"] == "3"
# assert resp_params["r"] == str(req_id)
# signature = bytes.fromhex(resp_params["s"])
# return resp_params["n"], signature
def parse_pdu(r: str, req_id: int | None = None):
if r.text.split("?")[0] != "REG-RESP": if r.text.split("?")[0] != "REG-RESP":
raise Exception(f"Failed to register: {r.text}") raise Exception(f"Failed to register: {r.text}")
#assert r.text.split("?")[0] == "REG-RESP" #assert r.text.split("?")[0] == "REG-RESP"
@ -24,62 +44,64 @@ def register(push_token: bytes, gateway = "22223333") -> tuple[str, bytes]:
resp_params = resp_params.split(";") resp_params = resp_params.split(";")
resp_params = {param.split("=")[0]: param.split("=")[1] for param in resp_params} resp_params = {param.split("=")[0]: param.split("=")[1] for param in resp_params}
assert resp_params["v"] == "3" assert resp_params["v"] == "3"
assert resp_params["r"] == str(req_id) if req_id is not None:
assert resp_params["r"] == str(req_id)
signature = bytes.fromhex(resp_params["s"]) signature = bytes.fromhex(resp_params["s"])
return resp_params["n"], signature return resp_params["n"], signature
async def main():
# Open test.json # async def main():
try: # # Open test.json
with open("test.json", "r") as f: # try:
test_json = f.read() # with open("test.json", "r") as f:
import json # test_json = f.read()
test_json = json.loads(test_json) # import json
except FileNotFoundError: # test_json = json.loads(test_json)
test_json = {} # except FileNotFoundError:
# test_json = {}
creds = apns.PushCredentials( # creds = apns.PushCredentials(
test_json.get("push_key", ""), # test_json.get("push_key", ""),
test_json.get("push_cert", ""), # test_json.get("push_cert", ""),
b64decode(test_json["push_token"]) if "push_token" in test_json else b"", # b64decode(test_json["push_token"]) if "push_token" in test_json else b"",
) # )
async with apns.APNSConnection.start(creds) as connection: # async with apns.APNSConnection.start(creds) as connection:
connection.credentials # connection.credentials
#number, sig = register(connection.credentials.token) # #number, sig = register(connection.credentials.token)
if "register_sig" not in test_json: # if "register_sig" not in test_json:
try: # try:
number, sig = register(connection.credentials.token) # number, sig = register(connection.credentials.token)
test_json["register_sig"] = sig.hex() # test_json["register_sig"] = sig.hex()
test_json["number"] = number # test_json["number"] = number
except Exception as e: # except Exception as e:
print(e) # print(e)
sig = None # sig = None
number = None # number = None
else: # else:
sig = bytes.fromhex(test_json["register_sig"]) # sig = bytes.fromhex(test_json["register_sig"])
number = test_json["number"] # number = test_json["number"]
if sig is not None and number is not None: # if sig is not None and number is not None:
from ids import profile # from ids import profile
phone_auth_keypair = profile.get_phone_cert(number, connection.credentials.token, [sig]) # phone_auth_keypair = profile.get_phone_cert(number, connection.credentials.token, [sig])
test_json["auth_key"] = phone_auth_keypair.key # test_json["auth_key"] = phone_auth_keypair.key
test_json["auth_cert"] = phone_auth_keypair.cert # test_json["auth_cert"] = phone_auth_keypair.cert
out_json = { # out_json = {
"push_key": creds.private_key, # "push_key": creds.private_key,
"push_cert": creds.cert, # "push_cert": creds.cert,
"push_token": b64encode(creds.token).decode("utf-8"), # "push_token": b64encode(creds.token).decode("utf-8"),
} # }
test_json.update(out_json) # test_json.update(out_json)
with open("test.json", "w") as f: # with open("test.json", "w") as f:
import json # import json
f.write(json.dumps(test_json, indent=4)) # f.write(json.dumps(test_json, indent=4))
if __name__ == "__main__": # if __name__ == "__main__":
trio.run(main) # trio.run(main)