mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
Upload files to ''
This commit is contained in:
parent
91764f44b4
commit
9fe9ff11a4
4 changed files with 415 additions and 0 deletions
86
__init__.py
Normal file
86
__init__.py
Normal file
|
@ -0,0 +1,86 @@
|
|||
from base64 import b64encode
|
||||
|
||||
import apns
|
||||
|
||||
from . import _helpers, identity, profile, query
|
||||
|
||||
|
||||
class IDSUser:
|
||||
# Sets self.user_id and self._auth_token
|
||||
def _authenticate_for_token(
|
||||
self, username: str, password: str, factor_callback: callable = None
|
||||
):
|
||||
self.user_id, self._auth_token = profile.get_auth_token(
|
||||
username, password, factor_callback
|
||||
)
|
||||
|
||||
# Sets self._auth_keypair using self.user_id and self._auth_token
|
||||
def _authenticate_for_cert(self):
|
||||
self._auth_keypair = profile.get_auth_cert(self.user_id, self._auth_token)
|
||||
|
||||
# Factor callback will be called if a 2FA code is necessary
|
||||
def __init__(
|
||||
self,
|
||||
push_connection: apns.APNSConnection,
|
||||
):
|
||||
self.push_connection = push_connection
|
||||
self._push_keypair = _helpers.KeyPair(
|
||||
self.push_connection.private_key, self.push_connection.cert
|
||||
)
|
||||
|
||||
self.ec_key = self.rsa_key = None
|
||||
|
||||
def __str__(self):
|
||||
return f"IDSUser(user_id={self.user_id}, handles={self.handles}, push_token={b64encode(self.push_connection.token).decode()})"
|
||||
|
||||
# Authenticates with a username and password, to create a brand new authentication keypair
|
||||
def authenticate(
|
||||
self, username: str, password: str, factor_callback: callable = None
|
||||
):
|
||||
self._authenticate_for_token(username, password, factor_callback)
|
||||
self._authenticate_for_cert()
|
||||
self.handles = profile.get_handles(
|
||||
b64encode(self.push_connection.token),
|
||||
self.user_id,
|
||||
self._auth_keypair,
|
||||
self._push_keypair,
|
||||
)
|
||||
self.current_handle = self.handles[0]
|
||||
|
||||
|
||||
# Uses an existing authentication keypair
|
||||
def restore_authentication(
|
||||
self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict
|
||||
):
|
||||
self._auth_keypair = auth_keypair
|
||||
self.user_id = user_id
|
||||
self.handles = handles
|
||||
self.current_handle = self.handles[0]
|
||||
|
||||
# This is a separate call so that the user can make sure the first part succeeds before asking for validation data
|
||||
def register(self, validation_data: str):
|
||||
"""
|
||||
self.ec_key, self.rsa_key will be set to a randomly gnenerated EC and RSA keypair
|
||||
if they are not already set
|
||||
"""
|
||||
if self.encryption_identity is None:
|
||||
self.encryption_identity = identity.IDSIdentity()
|
||||
|
||||
|
||||
cert = identity.register(
|
||||
b64encode(self.push_connection.token),
|
||||
self.handles,
|
||||
self.user_id,
|
||||
self._auth_keypair,
|
||||
self._push_keypair,
|
||||
self.encryption_identity,
|
||||
validation_data,
|
||||
)
|
||||
self._id_keypair = _helpers.KeyPair(self._auth_keypair.key, cert)
|
||||
|
||||
def restore_identity(self, id_keypair: _helpers.KeyPair):
|
||||
self._id_keypair = id_keypair
|
||||
|
||||
def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> any:
|
||||
return query.lookup(self.push_connection, self.current_handle, self._id_keypair, uris, topic)
|
||||
|
40
_helpers.py
Normal file
40
_helpers.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from collections import namedtuple
|
||||
|
||||
USER_AGENT = "com.apple.madrid-lookup [macOS,13.2.1,22D68,MacBookPro18,3]"
|
||||
PROTOCOL_VERSION = "1640"
|
||||
|
||||
# KeyPair is a named tuple that holds a private key and a certificate (public key) in PEM form, as well as a x509
|
||||
KeyPair = namedtuple("KeyPair", ["key", "cert"])
|
||||
Helperx509 = ""
|
||||
|
||||
|
||||
def dearmour(armoured: str) -> str:
|
||||
import re
|
||||
|
||||
# Use a regex to remove the header and footer (generic so it work on more than just certificates)
|
||||
return re.sub(r"-----BEGIN .*-----|-----END .*-----", "", armoured).replace(
|
||||
"\n", ""
|
||||
)
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, rsa
|
||||
def parse_key(key: str):
|
||||
# Check if it is a public or private key
|
||||
if "PUBLIC" in key:
|
||||
return serialization.load_pem_public_key(key.encode())
|
||||
else:
|
||||
return serialization.load_pem_private_key(key.encode(), None)
|
||||
|
||||
def serialize_key(key):
|
||||
if isinstance(key, ec.EllipticCurvePrivateKey) or isinstance(key, rsa.RSAPrivateKey):
|
||||
return key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
).decode("utf-8").strip()
|
||||
else:
|
||||
return key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
).decode("utf-8").strip()
|
||||
|
165
profile.py
Normal file
165
profile.py
Normal file
|
@ -0,0 +1,165 @@
|
|||
import plistlib
|
||||
import random
|
||||
import uuid
|
||||
from base64 import b64decode
|
||||
|
||||
import requests
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||
from cryptography.x509.oid import NameOID
|
||||
from sys import platform
|
||||
|
||||
import bags
|
||||
|
||||
from . import signing
|
||||
from ._helpers import PROTOCOL_VERSION, USER_AGENT, KeyPair
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("ids")
|
||||
|
||||
|
||||
auth_token = "" # function should return a rsa private key
|
||||
realm_user_id = "" # function should return a x509 public key
|
||||
|
||||
def _auth_token_request(username: str, password: str) -> any:
|
||||
# Turn the PET into an auth token
|
||||
data = {
|
||||
"username": username,
|
||||
#"client-id": str(uuid.uuid4()),
|
||||
#"delegates": {"com.apple.private.ids": {"protocol-version": "4"}},
|
||||
"password": password,
|
||||
}
|
||||
data = plistlib.dumps(data)
|
||||
|
||||
r = requests.post(
|
||||
# TODO: Figure out which URL bag we can get this from
|
||||
#"https://profile.ess.apple.com/WebObjects/VCProfileService.woa/wa/authenticateUser",
|
||||
"https://setup.icloud.com/setup/prefpane/loginDelegates",
|
||||
#auth=(username, password),
|
||||
data=data,
|
||||
verify=False,
|
||||
)
|
||||
r = plistlib.loads(r.content)
|
||||
return r
|
||||
|
||||
|
||||
# Gets an IDS auth token for the given username and password
|
||||
# Will use native Grand Slam on macOS
|
||||
# If factor_gen is not None, it will be called to get the 2FA code, otherwise it will be prompted
|
||||
# Returns (realm user id, auth token)
|
||||
def get_auth_token(
|
||||
username: str, password: str, factor_gen: callable = None
|
||||
) -> tuple[str, str]:
|
||||
result = _auth_token_request(username, password)
|
||||
|
||||
auth_token = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
) # rsa private key
|
||||
realm_user_id = _generate_csr(auth_token) # x509 public key
|
||||
# else:
|
||||
# logger.debug("Using old-style authentication")
|
||||
# # Make the request without the 2FA code to make the prompt appear
|
||||
# _auth_token_request(username, password)
|
||||
# # TODO: Make sure we actually need the second request, some rare accounts don't have 2FA
|
||||
# # Now make the request with the 2FA code
|
||||
# if factor_gen is None:
|
||||
# pet = password + input("Enter 2FA code: ")
|
||||
# else:
|
||||
# pet = password + factor_gen()
|
||||
# r = _auth_token_request(username, pet)
|
||||
# # print(r)
|
||||
# if "description" in r:
|
||||
# raise Exception(f"Error: {r['description']}")
|
||||
# service_data = r["delegates"]["com.apple.private.ids"]["service-data"]
|
||||
# realm_user_id = service_data["realm-user-id"]
|
||||
# auth_token = service_data["auth-token"]
|
||||
# print(f"Auth token for {realm_user_id}: {auth_token}")
|
||||
logger.debug(f"Got auth token for IDS: {auth_token}")
|
||||
return realm_user_id, auth_token
|
||||
|
||||
|
||||
def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
|
||||
csr = (
|
||||
x509.CertificateSigningRequestBuilder()
|
||||
.subject_name(
|
||||
x509.Name(
|
||||
[
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, random.randbytes(20).hex()),
|
||||
]
|
||||
)
|
||||
)
|
||||
.sign(private_key, hashes.SHA256())
|
||||
)
|
||||
|
||||
csr = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8")
|
||||
return (
|
||||
csr.replace("-----BEGIN CERTIFICATE REQUEST-----", "")
|
||||
.replace("-----END CERTIFICATE REQUEST-----", "")
|
||||
.replace("\n", "")
|
||||
)
|
||||
|
||||
|
||||
# Gets an IDS auth cert for the given user id and auth token
|
||||
# Returns [private key PEM, certificate PEM]
|
||||
def get_auth_cert(user_id, token) -> KeyPair:
|
||||
BAG_KEY = "id-authenticate-ds-id"
|
||||
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537, key_size=2048, backend=default_backend()
|
||||
)
|
||||
body = {
|
||||
"authentication-data": {"auth-token": token},
|
||||
"csr": realm_user_id,
|
||||
"realm-user-id": user_id,
|
||||
}
|
||||
|
||||
#body = plistlib.dumps(body)
|
||||
|
||||
priv = auth_token
|
||||
pub = realm_user_id
|
||||
x509cert = realm_user_id
|
||||
|
||||
r = {"cert": x509cert}
|
||||
|
||||
cert = r["cert"]
|
||||
logger.debug("Got auth cert from token")
|
||||
return KeyPair(
|
||||
private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
.decode("utf-8")
|
||||
.strip(),
|
||||
cert.strip(),
|
||||
)
|
||||
|
||||
|
||||
def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
|
||||
BAG_KEY = "id-get-handles"
|
||||
|
||||
headers = {
|
||||
"x-protocol-version": PROTOCOL_VERSION,
|
||||
"x-auth-user-id": user_id,
|
||||
}
|
||||
signing.add_auth_signature(
|
||||
headers, None, BAG_KEY, auth_key, push_key, push_token
|
||||
)
|
||||
|
||||
#r = requests.get(
|
||||
#bags.ids_bag()[BAG_KEY],
|
||||
#headers=headers,
|
||||
#verify=False,
|
||||
#)
|
||||
|
||||
#r = plistlib.loads(r[cert])
|
||||
|
||||
#if not "handles" in r:
|
||||
#raise Exception("No handles in response: " + str(r))
|
||||
|
||||
#logger.debug(f"User {user_id} has handles {r['handles']}")
|
||||
#return [handle["uri"] for handle in r["handles"]]
|
||||
return [realm_user_id]
|
124
signing.py
Normal file
124
signing.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
import random
|
||||
from base64 import b64decode, b64encode
|
||||
from datetime import datetime
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||
from cryptography.x509.oid import NameOID
|
||||
|
||||
from ._helpers import KeyPair, dearmour, Helperx509
|
||||
|
||||
|
||||
# TODO: Move this helper somewhere else
|
||||
def armour_cert(cert: bytes) -> str:
|
||||
return Helperx509
|
||||
|
||||
|
||||
"""
|
||||
Generates a nonce in this format:
|
||||
01000001876bd0a2c0e571093967fce3d7
|
||||
01 # version
|
||||
000001876d008cc5 # unix time
|
||||
r1r2r3r4r5r6r7r8 # random bytes
|
||||
"""
|
||||
|
||||
|
||||
def generate_nonce() -> bytes:
|
||||
return (
|
||||
b"\x01"
|
||||
+ int(datetime.now().timestamp() * 1000).to_bytes(8, "big")
|
||||
+ random.randbytes(8)
|
||||
)
|
||||
|
||||
|
||||
import typing
|
||||
|
||||
|
||||
# Creates a payload from individual parts for signing
|
||||
def _create_payload(
|
||||
bag_key: str,
|
||||
query_string: str,
|
||||
push_token: typing.Union[str, bytes],
|
||||
payload: bytes,
|
||||
nonce: typing.Union[bytes, None] = None,
|
||||
) -> tuple[bytes, bytes]:
|
||||
# Generate the nonce
|
||||
if nonce is None:
|
||||
nonce = generate_nonce()
|
||||
|
||||
push_token = b64decode(push_token)
|
||||
|
||||
if payload is None:
|
||||
payload = b""
|
||||
|
||||
return (
|
||||
nonce
|
||||
+ len(bag_key).to_bytes(4, "big")
|
||||
+ bag_key.encode()
|
||||
+ len(query_string).to_bytes(4, "big")
|
||||
+ query_string.encode()
|
||||
+ len(payload).to_bytes(4, "big")
|
||||
+ payload
|
||||
+ len(push_token).to_bytes(4, "big")
|
||||
+ push_token,
|
||||
nonce,
|
||||
)
|
||||
|
||||
|
||||
# Returns signature, nonce
|
||||
def _sign_payload(
|
||||
private_key: str, bag_key: str, query_string: str, push_token: str, payload: bytes, nonce = None
|
||||
) -> tuple[str, bytes]:
|
||||
# Load the private key
|
||||
key = serialization.load_pem_private_key(
|
||||
private_key.encode(), password=None, backend=default_backend()
|
||||
)
|
||||
|
||||
payload, nonce = _create_payload(bag_key, query_string, push_token, payload, nonce)
|
||||
|
||||
sig = key.sign(payload, padding.PKCS1v15(), hashes.SHA1()) # type: ignore
|
||||
|
||||
sig = b"\x01\x01" + sig
|
||||
sig = b64encode(sig).decode()
|
||||
|
||||
return sig, nonce
|
||||
|
||||
|
||||
# Add headers for x-push-sig and x-auth-sig stuff
|
||||
def add_auth_signature(
|
||||
headers: dict,
|
||||
body: bytes,
|
||||
bag_key: str,
|
||||
auth_key: KeyPair,
|
||||
push_key: KeyPair,
|
||||
push_token: str,
|
||||
auth_number=None,
|
||||
):
|
||||
push_sig, push_nonce = _sign_payload(push_key.key, bag_key, "", push_token, body)
|
||||
headers["x-push-sig"] = push_sig
|
||||
headers["x-push-nonce"] = b64encode(push_nonce)
|
||||
headers["x-push-cert"] = dearmour(push_key.cert)
|
||||
headers["x-push-token"] = push_token
|
||||
|
||||
auth_sig, auth_nonce = _sign_payload(auth_key.key, bag_key, "", push_token, body)
|
||||
auth_postfix = "-" + str(auth_number) if auth_number is not None else ""
|
||||
headers["x-auth-sig" + auth_postfix] = auth_sig
|
||||
headers["x-auth-nonce" + auth_postfix] = b64encode(auth_nonce)
|
||||
headers["x-auth-cert" + auth_postfix] = dearmour(auth_key.cert)
|
||||
|
||||
|
||||
def add_id_signature(
|
||||
headers: dict,
|
||||
body: bytes,
|
||||
bag_key: str,
|
||||
id_key: KeyPair,
|
||||
push_token: str,
|
||||
nonce=None,
|
||||
):
|
||||
id_sig, id_nonce = _sign_payload(id_key.key, bag_key, "", push_token, body, nonce)
|
||||
headers["x-id-sig"] = id_sig
|
||||
headers["x-id-nonce"] = b64encode(id_nonce).decode()
|
||||
headers["x-id-cert"] = dearmour(id_key.cert)
|
||||
headers["x-push-token"] = push_token
|
Loading…
Reference in a new issue