mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
use bags rather than hardcoded urls
This commit is contained in:
parent
9cec9d0965
commit
ce82d4b8da
5 changed files with 28 additions and 13 deletions
5
apns.py
5
apns.py
|
@ -9,8 +9,11 @@ from hashlib import sha1
|
||||||
import tlslite
|
import tlslite
|
||||||
|
|
||||||
import albert
|
import albert
|
||||||
|
import bags
|
||||||
|
|
||||||
COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config
|
#COURIER_HOST = "windows.courier.push.apple.com" # TODO: Get this from config
|
||||||
|
# Pick a random courier server from 01 to APNSCourierHostcount
|
||||||
|
COURIER_HOST = f"{random.randint(1, bags.apns_init_bag()['APNSCourierHostcount'])}-{bags.apns_init_bag()['APNSCourierHostname']}"
|
||||||
COURIER_PORT = 5223
|
COURIER_PORT = 5223
|
||||||
ALPN = [b"apns-security-v2"]
|
ALPN = [b"apns-security-v2"]
|
||||||
|
|
||||||
|
|
4
bags.py
4
bags.py
|
@ -3,7 +3,7 @@ import plistlib
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
|
||||||
def apns_init_bag():
|
def apns_init_bag_old():
|
||||||
r = requests.get("https://init.push.apple.com/bag", verify=False)
|
r = requests.get("https://init.push.apple.com/bag", verify=False)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise Exception("Failed to get APNs init bag")
|
raise Exception("Failed to get APNs init bag")
|
||||||
|
@ -15,7 +15,7 @@ def apns_init_bag():
|
||||||
|
|
||||||
|
|
||||||
# This is the same as the above, but the response has a signature which we unwrap
|
# This is the same as the above, but the response has a signature which we unwrap
|
||||||
def apns_init_bag_2():
|
def apns_init_bag():
|
||||||
r = requests.get("http://init-p01st.push.apple.com/bag", verify=False)
|
r = requests.get("http://init-p01st.push.apple.com/bag", verify=False)
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
raise Exception("Failed to get APNs init bag 2")
|
raise Exception("Failed to get APNs init bag 2")
|
||||||
|
|
|
@ -10,13 +10,13 @@ class IDSUser:
|
||||||
def _authenticate_for_token(
|
def _authenticate_for_token(
|
||||||
self, username: str, password: str, factor_callback: callable = None
|
self, username: str, password: str, factor_callback: callable = None
|
||||||
):
|
):
|
||||||
self.user_id, self._auth_token = profile._get_auth_token(
|
self.user_id, self._auth_token = profile.get_auth_token(
|
||||||
username, password, factor_callback
|
username, password, factor_callback
|
||||||
)
|
)
|
||||||
|
|
||||||
# Sets self._auth_keypair using self.user_id and self._auth_token
|
# Sets self._auth_keypair using self.user_id and self._auth_token
|
||||||
def _authenticate_for_cert(self):
|
def _authenticate_for_cert(self):
|
||||||
self._auth_keypair = profile._get_auth_cert(self.user_id, self._auth_token)
|
self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token)
|
||||||
|
|
||||||
# Factor callback will be called if a 2FA code is necessary
|
# Factor callback will be called if a 2FA code is necessary
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -37,7 +37,7 @@ class IDSUser:
|
||||||
):
|
):
|
||||||
self._authenticate_for_token(username, password, factor_callback)
|
self._authenticate_for_token(username, password, factor_callback)
|
||||||
self._authenticate_for_cert()
|
self._authenticate_for_cert()
|
||||||
self.handles = profile._get_handles(
|
self.handles = profile.get_handles(
|
||||||
b64encode(self.push_connection.token),
|
b64encode(self.push_connection.token),
|
||||||
self.user_id,
|
self.user_id,
|
||||||
self._auth_keypair,
|
self._auth_keypair,
|
||||||
|
|
|
@ -11,6 +11,7 @@ from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||||
from cryptography.x509.oid import NameOID
|
from cryptography.x509.oid import NameOID
|
||||||
|
|
||||||
import gsa
|
import gsa
|
||||||
|
import bags
|
||||||
|
|
||||||
from . import signing
|
from . import signing
|
||||||
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
|
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
|
||||||
|
@ -27,6 +28,7 @@ def _auth_token_request(username: str, password: str) -> any:
|
||||||
data = plistlib.dumps(data)
|
data = plistlib.dumps(data)
|
||||||
|
|
||||||
r = requests.post(
|
r = requests.post(
|
||||||
|
# TODO: Figure out which URL bag we can get this from
|
||||||
"https://setup.icloud.com/setup/prefpane/loginDelegates",
|
"https://setup.icloud.com/setup/prefpane/loginDelegates",
|
||||||
auth=(username, password),
|
auth=(username, password),
|
||||||
data=data,
|
data=data,
|
||||||
|
@ -40,7 +42,7 @@ def _auth_token_request(username: str, password: str) -> any:
|
||||||
# Will use native Grand Slam on macOS
|
# Will use native Grand Slam on macOS
|
||||||
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
|
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
|
||||||
# Returns (realm user id, auth token)
|
# Returns (realm user id, auth token)
|
||||||
def _get_auth_token(
|
def get_auth_token(
|
||||||
username: str, password: str, factor_gen: callable = None
|
username: str, password: str, factor_gen: callable = None
|
||||||
) -> tuple[str, str]:
|
) -> tuple[str, str]:
|
||||||
from sys import platform
|
from sys import platform
|
||||||
|
@ -92,7 +94,9 @@ def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
|
||||||
|
|
||||||
# Gets an IDS auth cert for the given user id and auth token
|
# Gets an IDS auth cert for the given user id and auth token
|
||||||
# Returns [private key PEM, certificate PEM]
|
# Returns [private key PEM, certificate PEM]
|
||||||
def _get_auth_cert(user_id, token) -> KeyPair:
|
def get_auth_cert(user_id, token) -> KeyPair:
|
||||||
|
BAG_KEY = "id-authenticate-ds-id"
|
||||||
|
|
||||||
private_key = rsa.generate_private_key(
|
private_key = rsa.generate_private_key(
|
||||||
public_exponent=65537, key_size=2048, backend=default_backend()
|
public_exponent=65537, key_size=2048, backend=default_backend()
|
||||||
)
|
)
|
||||||
|
@ -105,7 +109,8 @@ def _get_auth_cert(user_id, token) -> KeyPair:
|
||||||
body = plistlib.dumps(body)
|
body = plistlib.dumps(body)
|
||||||
|
|
||||||
r = requests.post(
|
r = requests.post(
|
||||||
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
|
bags.ids_bag()[BAG_KEY],
|
||||||
|
#"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateDS",
|
||||||
data=body,
|
data=body,
|
||||||
headers={"x-protocol-version": "1630"},
|
headers={"x-protocol-version": "1630"},
|
||||||
verify=False,
|
verify=False,
|
||||||
|
@ -126,17 +131,19 @@ def _get_auth_cert(user_id, token) -> KeyPair:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
|
def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
|
||||||
|
BAG_KEY = "id-get-handles"
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"x-protocol-version": PROTOCOL_VERSION,
|
"x-protocol-version": PROTOCOL_VERSION,
|
||||||
"x-auth-user-id": user_id,
|
"x-auth-user-id": user_id,
|
||||||
}
|
}
|
||||||
signing.add_auth_signature(
|
signing.add_auth_signature(
|
||||||
headers, None, "id-get-handles", auth_key, push_key, push_token
|
headers, None, BAG_KEY, auth_key, push_key, push_token
|
||||||
)
|
)
|
||||||
|
|
||||||
r = requests.get(
|
r = requests.get(
|
||||||
"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/idsGetHandles",
|
bags.ids_bag()[BAG_KEY],
|
||||||
headers=headers,
|
headers=headers,
|
||||||
verify=False,
|
verify=False,
|
||||||
)
|
)
|
||||||
|
|
|
@ -61,4 +61,9 @@ def lookup(
|
||||||
resp = plistlib.loads(resp)
|
resp = plistlib.loads(resp)
|
||||||
resp = gzip.decompress(resp["b"])
|
resp = gzip.decompress(resp["b"])
|
||||||
resp = plistlib.loads(resp)
|
resp = plistlib.loads(resp)
|
||||||
return resp
|
|
||||||
|
if resp['status'] != 0:
|
||||||
|
raise Exception(f'Query failed: {resp}')
|
||||||
|
if not 'results' in resp:
|
||||||
|
raise Exception(f'No results in response: {resp}')
|
||||||
|
return resp['results']
|
||||||
|
|
Loading…
Reference in a new issue