mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
more refactoring
This commit is contained in:
parent
1fc7098d5a
commit
3820c80765
2 changed files with 109 additions and 123 deletions
27
demo.py
27
demo.py
|
@ -73,15 +73,14 @@ def refresh_ids_cert():
|
||||||
"user_id": CONFIG["user_id"],
|
"user_id": CONFIG["user_id"],
|
||||||
}
|
}
|
||||||
|
|
||||||
print(ids._get_handles(
|
print(
|
||||||
|
ids._get_handles(
|
||||||
CONFIG["push"]["token"],
|
CONFIG["push"]["token"],
|
||||||
info,
|
CONFIG["user_id"],
|
||||||
CONFIG["auth_cert"],
|
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
|
||||||
CONFIG["key"],
|
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
|
||||||
CONFIG["push"]["cert"],
|
)
|
||||||
CONFIG["push"]["key"],
|
)
|
||||||
))
|
|
||||||
|
|
||||||
|
|
||||||
resp = None
|
resp = None
|
||||||
try:
|
try:
|
||||||
|
@ -89,10 +88,8 @@ def refresh_ids_cert():
|
||||||
resp = ids._register_request(
|
resp = ids._register_request(
|
||||||
CONFIG["push"]["token"],
|
CONFIG["push"]["token"],
|
||||||
info,
|
info,
|
||||||
CONFIG["auth_cert"],
|
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
|
||||||
CONFIG["key"],
|
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
|
||||||
CONFIG["push"]["cert"],
|
|
||||||
CONFIG["push"]["key"],
|
|
||||||
CONFIG["validation_data"],
|
CONFIG["validation_data"],
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -111,10 +108,8 @@ def refresh_ids_cert():
|
||||||
resp = ids._register_request(
|
resp = ids._register_request(
|
||||||
CONFIG["push"]["token"],
|
CONFIG["push"]["token"],
|
||||||
info,
|
info,
|
||||||
CONFIG["auth_cert"],
|
ids.KeyPair(CONFIG["key"], CONFIG["auth_cert"]),
|
||||||
CONFIG["key"],
|
ids.KeyPair(CONFIG["push"]["key"], CONFIG["push"]["cert"]),
|
||||||
CONFIG["push"]["cert"],
|
|
||||||
CONFIG["push"]["key"],
|
|
||||||
validation_data,
|
validation_data,
|
||||||
)
|
)
|
||||||
CONFIG["validation_data"] = validation_data
|
CONFIG["validation_data"] = validation_data
|
||||||
|
|
203
ids.py
203
ids.py
|
@ -18,69 +18,10 @@ import bags
|
||||||
import gsa
|
import gsa
|
||||||
|
|
||||||
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
|
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
|
||||||
|
PROTOCOL_VERSION = "1640"
|
||||||
|
|
||||||
KeyPair = namedtuple("KeyPair", ["key", "cert"])
|
KeyPair = namedtuple("KeyPair", ["key", "cert"])
|
||||||
|
|
||||||
|
|
||||||
# Nonce Format:
|
|
||||||
# 01000001876bd0a2c0e571093967fce3d7
|
|
||||||
# 01 # version
|
|
||||||
# 000001876d008cc5 # unix time
|
|
||||||
# r1r2r3r4r5r6r7r8 # random bytes
|
|
||||||
def generate_nonce() -> bytes:
|
|
||||||
return (
|
|
||||||
b"\x01"
|
|
||||||
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
|
|
||||||
+ random.randbytes(8)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _create_payload(
|
|
||||||
bag_key: str,
|
|
||||||
query_string: str,
|
|
||||||
push_token: str,
|
|
||||||
payload: bytes,
|
|
||||||
nonce: bytes = None,
|
|
||||||
) -> tuple[str, bytes]:
|
|
||||||
# Generate the nonce
|
|
||||||
if nonce is None:
|
|
||||||
nonce = generate_nonce()
|
|
||||||
push_token = b64decode(push_token)
|
|
||||||
|
|
||||||
if payload is None:
|
|
||||||
payload = b""
|
|
||||||
|
|
||||||
return (
|
|
||||||
nonce
|
|
||||||
+ len(bag_key).to_bytes(4, "big")
|
|
||||||
+ bag_key.encode()
|
|
||||||
+ len(query_string).to_bytes(4, "big")
|
|
||||||
+ query_string.encode()
|
|
||||||
+ len(payload).to_bytes(4, "big")
|
|
||||||
+ payload
|
|
||||||
+ len(push_token).to_bytes(4, "big")
|
|
||||||
+ push_token,
|
|
||||||
nonce,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def sign_payload(
|
|
||||||
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
|
|
||||||
) -> tuple[str, bytes]:
|
|
||||||
# Load the private key
|
|
||||||
key = serialization.load_pem_private_key(
|
|
||||||
private_key.encode(), password=None, backend=default_backend()
|
|
||||||
)
|
|
||||||
|
|
||||||
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
|
|
||||||
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
|
|
||||||
|
|
||||||
sig = b"\x01\x01" + sig
|
|
||||||
sig = b64encode(sig).decode()
|
|
||||||
|
|
||||||
return sig, nonce
|
|
||||||
|
|
||||||
|
|
||||||
# global_key, global_cert = load_keys()
|
# global_key, global_cert = load_keys()
|
||||||
|
|
||||||
|
|
||||||
|
@ -97,7 +38,7 @@ def _send_request(
|
||||||
push_token = b64encode(conn.token).decode()
|
push_token = b64encode(conn.token).decode()
|
||||||
|
|
||||||
# Sign the request
|
# Sign the request
|
||||||
signature, nonce = sign_payload(keypair.key, bag_key, "", push_token, body)
|
signature, nonce = _sign_payload(keypair.key, bag_key, "", push_token, body)
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "")
|
"x-id-cert": keypair.cert.replace("-----BEGIN CERTIFICATE-----", "")
|
||||||
|
@ -276,7 +217,7 @@ def _get_auth_cert(user_id, token) -> KeyPair:
|
||||||
|
|
||||||
|
|
||||||
def _register_request(
|
def _register_request(
|
||||||
push_token, info, auth_cert, auth_key, push_cert, push_key, validation_data
|
push_token, info, auth_key: KeyPair, push_key: KeyPair, validation_data
|
||||||
):
|
):
|
||||||
body = {
|
body = {
|
||||||
"hardware-version": "MacBookPro18,3",
|
"hardware-version": "MacBookPro18,3",
|
||||||
|
@ -300,29 +241,14 @@ def _register_request(
|
||||||
}
|
}
|
||||||
|
|
||||||
body = plistlib.dumps(body)
|
body = plistlib.dumps(body)
|
||||||
body = gzip.compress(body, mtime=0)
|
|
||||||
|
|
||||||
push_sig, push_nonce = sign_payload(push_key, "id-register", "", push_token, body)
|
|
||||||
auth_sig, auth_nonce = sign_payload(auth_key, "id-register", "", push_token, body)
|
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"x-protocol-version": "1640",
|
"x-protocol-version": PROTOCOL_VERSION,
|
||||||
"content-type": "application/x-apple-plist",
|
|
||||||
"content-encoding": "gzip",
|
|
||||||
"x-auth-sig-0": auth_sig,
|
|
||||||
"x-auth-cert-0": auth_cert.replace("\n", "")
|
|
||||||
.replace("-----BEGIN CERTIFICATE-----", "")
|
|
||||||
.replace("-----END CERTIFICATE-----", ""),
|
|
||||||
"x-auth-user-id-0": info["user_id"],
|
"x-auth-user-id-0": info["user_id"],
|
||||||
"x-auth-nonce-0": b64encode(auth_nonce),
|
|
||||||
# "x-pr-nonce": b64encode(auth_nonce),
|
|
||||||
"x-push-token": push_token,
|
|
||||||
"x-push-sig": push_sig,
|
|
||||||
"x-push-cert": push_cert.replace("\n", "")
|
|
||||||
.replace("-----BEGIN CERTIFICATE-----", "")
|
|
||||||
.replace("-----END CERTIFICATE-----", ""),
|
|
||||||
"x-push-nonce": b64encode(push_nonce),
|
|
||||||
}
|
}
|
||||||
|
_add_auth_push_signatures(
|
||||||
|
headers, body, "id-register", auth_key, push_key, push_token, 0
|
||||||
|
)
|
||||||
|
|
||||||
r = requests.post(
|
r = requests.post(
|
||||||
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
|
"https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/register",
|
||||||
|
@ -346,28 +272,6 @@ def mini_cert(cert: str):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_signatures(
|
|
||||||
headers: dict,
|
|
||||||
body: bytes,
|
|
||||||
bag_key: str,
|
|
||||||
auth_key: KeyPair,
|
|
||||||
push_key: KeyPair,
|
|
||||||
push_token: str,
|
|
||||||
auth_number=None,
|
|
||||||
):
|
|
||||||
push_sig, push_nonce = sign_payload(push_key.key, bag_key, "", push_token, body)
|
|
||||||
headers["x-push-sig"] = push_sig
|
|
||||||
headers["x-push-nonce"] = b64encode(push_nonce)
|
|
||||||
headers["x-push-cert"] = mini_cert(push_key.cert)
|
|
||||||
headers["x-push-token"] = push_token
|
|
||||||
|
|
||||||
auth_sig, auth_nonce = sign_payload(auth_key.key, bag_key, "", push_token, body)
|
|
||||||
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
|
|
||||||
headers["x-auth-sig" + auth_postfix] = auth_sig
|
|
||||||
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
|
|
||||||
headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert)
|
|
||||||
|
|
||||||
|
|
||||||
PROTOCOL_VERSION = "1640"
|
PROTOCOL_VERSION = "1640"
|
||||||
|
|
||||||
|
|
||||||
|
@ -376,7 +280,9 @@ def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair)
|
||||||
"x-protocol-version": PROTOCOL_VERSION,
|
"x-protocol-version": PROTOCOL_VERSION,
|
||||||
"x-auth-user-id": user_id,
|
"x-auth-user-id": user_id,
|
||||||
}
|
}
|
||||||
add_signatures(headers, None, "id-get-handles", auth_key, push_key, push_token)
|
_add_auth_push_signatures(
|
||||||
|
headers, None, "id-get-handles", auth_key, push_key, push_token
|
||||||
|
)
|
||||||
|
|
||||||
r = requests.get(
|
r = requests.get(
|
||||||
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
|
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
|
||||||
|
@ -427,15 +333,100 @@ class IDSUser:
|
||||||
|
|
||||||
def test():
|
def test():
|
||||||
import getpass
|
import getpass
|
||||||
|
|
||||||
conn = apns.APNSConnection()
|
conn = apns.APNSConnection()
|
||||||
conn.connect()
|
conn.connect()
|
||||||
username = input("Enter username: ")
|
username = input("Enter username: ")
|
||||||
password = getpass.getpass("Enter password: ")
|
password = getpass.getpass("Enter password: ")
|
||||||
user = IDSUser(conn, username, password)
|
user = IDSUser(conn, username, password)
|
||||||
print(user)
|
print(user)
|
||||||
# user.authenticate("test", "test")
|
|
||||||
|
|
||||||
|
# SIGNING STUFF
|
||||||
|
|
||||||
|
# Nonce Format:
|
||||||
|
# 01000001876bd0a2c0e571093967fce3d7
|
||||||
|
# 01 # version
|
||||||
|
# 000001876d008cc5 # unix time
|
||||||
|
# r1r2r3r4r5r6r7r8 # random bytes
|
||||||
|
def generate_nonce() -> bytes:
|
||||||
|
return (
|
||||||
|
b"\x01"
|
||||||
|
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
|
||||||
|
+ random.randbytes(8)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Creates a payload from individual parts for signing
|
||||||
|
def _create_payload(
|
||||||
|
bag_key: str,
|
||||||
|
query_string: str,
|
||||||
|
push_token: str,
|
||||||
|
payload: bytes,
|
||||||
|
nonce: bytes = None,
|
||||||
|
) -> tuple[str, bytes]:
|
||||||
|
# Generate the nonce
|
||||||
|
if nonce is None:
|
||||||
|
nonce = generate_nonce()
|
||||||
|
push_token = b64decode(push_token)
|
||||||
|
|
||||||
|
if payload is None:
|
||||||
|
payload = b""
|
||||||
|
|
||||||
|
return (
|
||||||
|
nonce
|
||||||
|
+ len(bag_key).to_bytes(4, "big")
|
||||||
|
+ bag_key.encode()
|
||||||
|
+ len(query_string).to_bytes(4, "big")
|
||||||
|
+ query_string.encode()
|
||||||
|
+ len(payload).to_bytes(4, "big")
|
||||||
|
+ payload
|
||||||
|
+ len(push_token).to_bytes(4, "big")
|
||||||
|
+ push_token,
|
||||||
|
nonce,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Returns signature, nonce
|
||||||
|
def _sign_payload(
|
||||||
|
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes
|
||||||
|
) -> tuple[str, bytes]:
|
||||||
|
# Load the private key
|
||||||
|
key = serialization.load_pem_private_key(
|
||||||
|
private_key.encode(), password=None, backend=default_backend()
|
||||||
|
)
|
||||||
|
|
||||||
|
payload, nonce = _create_payload(bag_key, query_string, push_token, payload)
|
||||||
|
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1())
|
||||||
|
|
||||||
|
sig = b"\x01\x01" + sig
|
||||||
|
sig = b64encode(sig).decode()
|
||||||
|
|
||||||
|
return sig, nonce
|
||||||
|
|
||||||
|
|
||||||
|
# Add headers for x-push-sig and x-auth-sig stuff
|
||||||
|
def _add_auth_push_signatures(
|
||||||
|
headers: dict,
|
||||||
|
body: bytes,
|
||||||
|
bag_key: str,
|
||||||
|
auth_key: KeyPair,
|
||||||
|
push_key: KeyPair,
|
||||||
|
push_token: str,
|
||||||
|
auth_number=None,
|
||||||
|
):
|
||||||
|
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
|
||||||
|
headers["x-push-sig"] = push_sig
|
||||||
|
headers["x-push-nonce"] = b64encode(push_nonce)
|
||||||
|
headers["x-push-cert"] = mini_cert(push_key.cert)
|
||||||
|
headers["x-push-token"] = push_token
|
||||||
|
|
||||||
|
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
|
||||||
|
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
|
||||||
|
headers["x-auth-sig" + auth_postfix] = auth_sig
|
||||||
|
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
|
||||||
|
headers["x-auth-cert" + auth_postfix] = mini_cert(auth_key.cert)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
test()
|
test()
|
||||||
# def register(users: list[])
|
|
||||||
|
|
Loading…
Reference in a new issue