mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
Merge pull request #29 from SpaceSaver/sms-registration
Implement automatic gateway detection into sms-registration
This commit is contained in:
commit
1b55b91bc0
2 changed files with 89 additions and 0 deletions
80
gateway_fetch.py
Normal file
80
gateway_fetch.py
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
import requests, random, plistlib, zipfile
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
appleplist = None
|
||||||
|
|
||||||
|
def getMasterList():
|
||||||
|
global appleplist
|
||||||
|
if (appleplist is None):
|
||||||
|
appleplist = plistlib.loads(requests.get("https://itunes.apple.com/WebObjects/MZStore.woa/wa/com.apple.jingle.appserver.client.MZITunesClientCheck/version?languageCode=en").content)
|
||||||
|
return appleplist
|
||||||
|
|
||||||
|
def getGatewayMCCMNC(MCCMNC):
|
||||||
|
gateways = getGatewaysMCCMNC(MCCMNC)
|
||||||
|
if gateways is None:
|
||||||
|
return None
|
||||||
|
return gateways[random.randrange(0, len(gateways))]
|
||||||
|
|
||||||
|
def getGatewaysMCCMNC(MCCMNC):
|
||||||
|
bundles = getBundlesMCCMNC(MCCMNC)
|
||||||
|
gateway = None
|
||||||
|
if bundles is None:
|
||||||
|
return
|
||||||
|
for bundle in bundles:
|
||||||
|
gateway = getGatewayFromBundle(parseBundle(bundle["Bundle"]))
|
||||||
|
if gateway is not None:
|
||||||
|
break
|
||||||
|
return gateway
|
||||||
|
|
||||||
|
def getBundlesMCCMNC(MCCMNC):
|
||||||
|
appleplist = getMasterList()
|
||||||
|
bundlelist = []
|
||||||
|
if MCCMNC in appleplist["MobileDeviceCarriersByMccMnc"]:
|
||||||
|
mmo = appleplist["MobileDeviceCarriersByMccMnc"][MCCMNC]
|
||||||
|
if "BundleName" in mmo:
|
||||||
|
bundle = getBundleByName(mmo["BundleName"])
|
||||||
|
if bundle is not None:
|
||||||
|
bundlelist.append({"Name": mmo["BundleName"], "Bundle": bundle})
|
||||||
|
if "MVNOs" in mmo:
|
||||||
|
for mv in mmo["MVNOs"]:
|
||||||
|
if "BundleName" in mv:
|
||||||
|
bundle = getBundleByName(mv["BundleName"])
|
||||||
|
if bundle is not None:
|
||||||
|
bundlelist.append({"Name": mv["BundleName"], "Bundle": bundle})
|
||||||
|
return bundlelist
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def getGatewayFromBundle(bundledict):
|
||||||
|
applecarrierplist = bundledict
|
||||||
|
if "PhoneNumberRegistrationGatewayAddress" in applecarrierplist:
|
||||||
|
regnum = applecarrierplist["PhoneNumberRegistrationGatewayAddress"]
|
||||||
|
if (type(regnum) == str):
|
||||||
|
regnum = [regnum]
|
||||||
|
return regnum
|
||||||
|
return None
|
||||||
|
|
||||||
|
def parseBundle(bundle):
|
||||||
|
bundlebytes = BytesIO(bundle)
|
||||||
|
bundlezip = zipfile.ZipFile(bundlebytes)
|
||||||
|
carrierpath = [path for path in bundlezip.namelist() if path.startswith("Payload/") and path.endswith("/carrier.plist")][0]
|
||||||
|
applecarrierplist = plistlib.load(bundlezip.open(carrierpath, "r"))
|
||||||
|
return applecarrierplist
|
||||||
|
|
||||||
|
def getBundleByName(BundleName):
|
||||||
|
appleplist = getMasterList()
|
||||||
|
if BundleName in appleplist["MobileDeviceCarrierBundlesByProductVersion"]:
|
||||||
|
x = BundleName
|
||||||
|
greatestver = "0"
|
||||||
|
for y in appleplist["MobileDeviceCarrierBundlesByProductVersion"][x]:
|
||||||
|
try:
|
||||||
|
inty = float(y)
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
if (inty > float(greatestver)):
|
||||||
|
greatestver = y
|
||||||
|
if greatestver != "0":
|
||||||
|
return requests.get(appleplist["MobileDeviceCarrierBundlesByProductVersion"][x][greatestver]["BundleURL"]).content
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
return None
|
|
@ -2,6 +2,7 @@ import requests
|
||||||
import random
|
import random
|
||||||
import apns
|
import apns
|
||||||
import trio
|
import trio
|
||||||
|
import gateway_fetch
|
||||||
from base64 import b64decode, b64encode
|
from base64 import b64decode, b64encode
|
||||||
|
|
||||||
import urllib3
|
import urllib3
|
||||||
|
@ -14,6 +15,14 @@ GATEWAY = "22223333"
|
||||||
def register(push_token: bytes, no_parse = False, gateway = None) -> tuple[str, bytes]:
|
def register(push_token: bytes, no_parse = False, gateway = None) -> tuple[str, bytes]:
|
||||||
"""Forwards a registration request to the phone and returns the phone number, signature for the provided push token"""
|
"""Forwards a registration request to the phone and returns the phone number, signature for the provided push token"""
|
||||||
if gateway is None:
|
if gateway is None:
|
||||||
|
print("Requesting device MCC+MNC for gateway detection...")
|
||||||
|
mccmnc = requests.get(f"http://{PHONE_IP}:{API_PORT}/info").text
|
||||||
|
print("MCC+MNC received! " + mccmnc)
|
||||||
|
print("Determining gateway...")
|
||||||
|
gateway = gateway_fetch.getGatewayMCCMNC(mccmnc)
|
||||||
|
print("Gateway found! " + gateway)
|
||||||
|
if gateway is None:
|
||||||
|
print("Automatic gateway detection failed, switching to default...")
|
||||||
gateway = GATEWAY
|
gateway = GATEWAY
|
||||||
token = push_token.hex().upper()
|
token = push_token.hex().upper()
|
||||||
req_id = random.randint(0, 2**32)
|
req_id = random.randint(0, 2**32)
|
||||||
|
|
Loading…
Reference in a new issue