mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
175 lines
No EOL
5.4 KiB
Python
175 lines
No EOL
5.4 KiB
Python
from ids import *
|
|
import ids
|
|
import getpass
|
|
import json
|
|
|
|
# Open config
|
|
try:
|
|
with open("config.json", "r") as f:
|
|
CONFIG = json.load(f)
|
|
except FileNotFoundError:
|
|
CONFIG = {}
|
|
|
|
def input_multiline(prompt):
|
|
print(prompt)
|
|
lines = []
|
|
while True:
|
|
line = input()
|
|
if line == "":
|
|
break
|
|
lines.append(line)
|
|
return "\n".join(lines)
|
|
|
|
def refresh_token():
|
|
# If no username is set, prompt for it
|
|
if "username" not in CONFIG:
|
|
CONFIG["username"] = input("Enter iCloud username: ")
|
|
# If no password is set, prompt for it
|
|
if "password" not in CONFIG:
|
|
CONFIG["password"] = getpass.getpass("Enter iCloud password: ")
|
|
# If grandslam authentication is not set, prompt for it
|
|
if "use_gsa" not in CONFIG:
|
|
CONFIG["use_gsa"] = input("Use grandslam authentication? [y/N] ").lower() == "y"
|
|
|
|
def factor_gen():
|
|
return input("Enter iCloud 2FA code: ")
|
|
|
|
CONFIG["user_id"], CONFIG["token"] = ids._get_auth_token(
|
|
CONFIG["username"], CONFIG["password"], CONFIG["use_gsa"], factor_gen=factor_gen
|
|
)
|
|
|
|
def refresh_cert():
|
|
CONFIG["key"], CONFIG["auth_cert"] = ids._get_auth_cert(
|
|
CONFIG["user_id"], CONFIG["token"]
|
|
)
|
|
|
|
def create_connection():
|
|
conn = apns.APNSConnection()
|
|
token = conn.connect()
|
|
#conn.filter(['com.apple.madrid'])
|
|
CONFIG['push'] = {
|
|
'token': b64encode(token).decode(),
|
|
'cert': conn.cert,
|
|
'key': conn.private_key
|
|
}
|
|
return conn
|
|
|
|
def restore_connection():
|
|
conn = apns.APNSConnection(CONFIG['push']['key'], CONFIG['push']['cert'])
|
|
conn.connect(True, b64decode(CONFIG['push']['token']))
|
|
#conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi'])
|
|
return conn
|
|
|
|
def refresh_ids_cert():
|
|
info = {
|
|
"uri": "mailto:" + CONFIG["username"],
|
|
"user_id": CONFIG['user_id'],
|
|
}
|
|
|
|
resp = None
|
|
try:
|
|
if "validation_data" in CONFIG:
|
|
resp = ids._register_request(
|
|
CONFIG['push']['token'],
|
|
info,
|
|
CONFIG['auth_cert'],
|
|
CONFIG['key'],
|
|
CONFIG['push']['cert'],
|
|
CONFIG['push']['key'],
|
|
CONFIG["validation_data"],
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
resp = None
|
|
|
|
if resp is None:
|
|
print(
|
|
"Note: Validation data can be obtained from @JJTech, or intercepted using a HTTP proxy."
|
|
)
|
|
validation_data = (
|
|
input_multiline("Enter validation data: ")
|
|
.replace("\n", "")
|
|
.replace(" ", "")
|
|
)
|
|
resp = ids._register_request(
|
|
CONFIG['push']['token'],
|
|
info,
|
|
CONFIG['auth_cert'],
|
|
CONFIG['key'],
|
|
CONFIG['push']['cert'],
|
|
CONFIG['push']['key'],
|
|
validation_data,
|
|
)
|
|
CONFIG["validation_data"] = validation_data
|
|
|
|
ids_cert = x509.load_der_x509_certificate(
|
|
resp["services"][0]["users"][0]["cert"]
|
|
)
|
|
ids_cert = (
|
|
ids_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8").strip()
|
|
)
|
|
|
|
CONFIG["ids_cert"] = ids_cert
|
|
|
|
|
|
if not 'push' in CONFIG:
|
|
print("No existing APNs credentials, creating new ones...")
|
|
#print("No push conn")
|
|
conn = create_connection()
|
|
else:
|
|
print("Restoring APNs credentials...")
|
|
conn = restore_connection()
|
|
print("Connected to APNs!")
|
|
|
|
if not 'ids_cert' in CONFIG:
|
|
print("No existing IDS certificate, creating new one...")
|
|
if not 'key' in CONFIG:
|
|
print("No existing authentication certificate, creating new one...")
|
|
if not 'token' in CONFIG:
|
|
print("No existing authentication token, creating new one...")
|
|
refresh_token()
|
|
print("Got authentication token!")
|
|
refresh_cert()
|
|
print("Got authentication certificate!")
|
|
refresh_ids_cert()
|
|
print("Got IDS certificate!")
|
|
|
|
ids_keypair = ids.KeyPair(CONFIG['key'], CONFIG['ids_cert'])
|
|
|
|
def lookup(topic:str, users: list[str]):
|
|
print(f"Looking up users {users} for topic {topic}...")
|
|
resp = ids.lookup(conn, CONFIG['username'], ids_keypair, topic, users)
|
|
|
|
#r = list(resp['results'].values())[0]
|
|
for k, v in resp['results'].items():
|
|
print(f"Result for user {k} topic {topic}:")
|
|
i = v['identities']
|
|
print(f"IDENTITIES: {len(i)}")
|
|
for iden in i:
|
|
print("IDENTITY", end=" ")
|
|
print(f"Push Token: {b64encode(iden['push-token']).decode()}", end=" ")
|
|
if 'client-data' in iden:
|
|
print(f"Client Data: {len(iden['client-data'])}")
|
|
|
|
else:
|
|
print("No client data")
|
|
|
|
# Hack to make sure that the requests and responses match up
|
|
# This filter MUST contain all the topics you are looking up
|
|
conn.filter(['com.apple.madrid', 'com.apple.private.alloy.facetime.multi', 'com.apple.private.alloy.multiplex1'])
|
|
import time
|
|
print("...waiting for queued messages... (this is a hack)")
|
|
time.sleep(5) # Let the server send us any messages it was holding
|
|
conn.sink() # Dump the messages
|
|
|
|
lookup("com.apple.madrid", ["mailto:jjtech@jjtech.dev"])
|
|
lookup("com.apple.private.alloy.facetime.multi", ["mailto:jjtech@jjtech.dev"])
|
|
|
|
lookup("com.apple.private.alloy.facetime.multi", ["mailto:user_test2@icloud.com"])
|
|
lookup("com.apple.madrid", ["mailto:user_test2@icloud.com"])
|
|
|
|
lookup("com.apple.private.alloy.multiplex1", ["mailto:user_test2@icloud.com"])
|
|
|
|
# Save config
|
|
with open("config.json", "w") as f:
|
|
json.dump(CONFIG, f, indent=4) |