sms forwarding better

This commit is contained in:
JJTech0130 2023-08-13 19:26:57 -04:00
parent 0ea957b536
commit 04b139f941
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
4 changed files with 144 additions and 47 deletions

View file

@ -179,7 +179,12 @@ class APNSConnection:
return self.token
old_topics = [] # Keep old topics so that we can add topics one by one
def filter(self, topics: list[str]):
if topics == self.old_topics:
return
topics = list(set(topics + self.old_topics))
self.old_topics = topics
logger.debug(f"Sending filter message with topics {topics}")
fields = [(1, self.token)]

View file

@ -22,11 +22,11 @@ logging.getLogger("py.warnings").setLevel(logging.ERROR) # Ignore warnings from
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.DEBUG)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.DEBUG)
logging.getLogger("imessage").setLevel(logging.INFO)
logging.captureWarnings(True)
@ -153,6 +153,8 @@ def fixup_handle(handle):
current_participants = []
current_effect = None
while True:
if not im._received_activation_message:
im._activate_sms()
msg = im.receive()
if msg is not None:
# print(f'[{msg.sender}] {msg.text}')

View file

@ -30,6 +30,9 @@ def lookup(
"x-id-self-uri": self_uri,
"x-protocol-version": PROTOCOL_VERSION,
}
if 'alloy' in topic:
headers["x-id-sub-service"] = topic # Hack, if it has alloy in the name it's probably a sub-service
signing.add_id_signature(headers, body, BAG_KEY, id_keypair, push_token)
msg_id = random.randbytes(16)

View file

@ -200,18 +200,23 @@ class iMessage:
logger.debug(f"Decompressed message : {message}")
return iMessage(
text=message.get("t", ""),
xml=message.get("x"),
participants=message.get("p", []),
sender=sender if sender is not None else message.get("p", [])[-1] if "p" in message else None,
id=uuid.UUID(message.get("r")) if "r" in message else None,
group_id=uuid.UUID(message.get("gid")) if "gid" in message else None,
body=BalloonBody(message["bid"], message["b"]) if "bid" in message and "b" in message else None,
effect=message["iid"] if "iid" in message else None,
_compressed=compressed,
_raw=message,
)
try:
return iMessage(
text=message["t"], # Cause it to "fail to parse" if there isn't any good text to display, temp hack
xml=message.get("x"),
participants=message.get("p", []),
sender=sender if sender is not None else message.get("p", [])[-1] if "p" in message else None,
id=uuid.UUID(message.get("r")) if "r" in message else None,
group_id=uuid.UUID(message.get("gid")) if "gid" in message else None,
body=BalloonBody(message["bid"], message["b"]) if "bid" in message and "b" in message else None,
effect=message["iid"] if "iid" in message else None,
_compressed=compressed,
_raw=message,
)
except:
import json
dmp = json.dumps(message, indent=4)
return iMessage(text=f"failed to parse: {dmp}", _raw=message)
def to_raw(self) -> bytes:
"""Convert an `iMessage` to raw message bytes"""
@ -404,13 +409,13 @@ class iMessageUser:
def _verify_payload(self, payload: bytes, sender: str, sender_token: str) -> bool:
# Get the public key for the sender
self._cache_keys([sender])
self._cache_keys([sender], "com.apple.madrid")
if not sender_token in self.KEY_CACHE:
logger.warning("Unable to find the public key of the sender, cannot verify")
return False
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token][0])
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token]["com.apple.madrid"][0])
sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key)
payload = iMessageUser._parse_payload(payload)
@ -450,12 +455,12 @@ class iMessageUser:
return iMessage.from_raw(decrypted, body['sP'])
KEY_CACHE_HANDLE: str = ""
KEY_CACHE: dict[bytes, tuple[bytes, bytes]] = {}
"""Mapping of push token : (public key, session token)"""
KEY_CACHE: dict[bytes, dict[str, tuple[bytes, bytes]]] = {}
"""Mapping of push token : topic : (public key, session token)"""
USER_CACHE: dict[str, list[bytes]] = {}
"""Mapping of handle : [push tokens]"""
def _cache_keys(self, participants: list[str]):
def _cache_keys(self, participants: list[str], topic: str):
# Clear the cache if the handle has changed
if self.KEY_CACHE_HANDLE != self.user.current_handle:
self.KEY_CACHE_HANDLE = self.user.current_handle
@ -467,7 +472,7 @@ class iMessageUser:
return
# Look up the public keys for the participants, and cache a token : public key mapping
lookup = self.user.lookup(participants)
lookup = self.user.lookup(participants, topic=topic)
for key, participant in lookup.items():
if not key in self.USER_CACHE:
@ -487,44 +492,126 @@ class iMessageUser:
# print(identity)
self.KEY_CACHE[identity["push-token"]] = (
if not identity["push-token"] in self.KEY_CACHE:
self.KEY_CACHE[identity["push-token"]] = {}
self.KEY_CACHE[identity["push-token"]][topic] = (
identity["client-data"]["public-message-identity-key"],
identity["session-token"],
)
def _encode_multiple(self, participants: list[str], sender: str, topic="com.apple.madrid", to_encrypt: bytes | None = None,) -> list[dict]:
self._cache_keys(participants, topic)
out = []
for participant in participants:
for push_token in self.USER_CACHE[participant]:
if push_token == self.connection.token:
continue # Don't send to ourselves
identity_keys = ids.identity.IDSIdentity.decode(
self.KEY_CACHE[push_token][topic][0]
)
if to_encrypt != None:
payload = self._encrypt_sign_payload(identity_keys, to_encrypt)
else:
payload = None
p = {
"tP": participant,
"D": not participant == sender,
"sT": self.KEY_CACHE[push_token][topic][1],
"t": push_token,
}
if payload is not None:
p["P"] = payload
logger.debug(f"Encoded payload : {p}")
out.append(p)
return out
_received_activation_message: bool = False
def _activate_sms(self) -> bool:
# Check if we have received an SMS forwarding activation message
if not self._received_activation_message:
# Check if it is in the queue
def check(x):
if x[0] != 0x0A:
return False
if apns._get_field(x[1], 2) != sha1("com.apple.private.alloy.sms".encode()).digest():
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
if "c" not in resp_body or resp_body["c"] != 145:
return False
return True
payload = self.connection.incoming_queue.pop_find(check)
if payload is None:
return False
payload = apns._get_field(payload[1], 3)
payload = plistlib.loads(payload)
dec = self._decrypt_payload(payload['P'])
# Try gzip decompression
try:
dec = gzip.decompress(dec)
except:
pass
dec = plistlib.loads(dec)
logger.debug(f"Think we got an SMS forwarding payload : {payload}")
logger.debug(f"Decrypted : {dec}")
self._received_activation_message = True
# Send out the activation message
msg_id = random.randbytes(4)
body = {
"fcn": 1,
"c": 147,
"ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
"U": uuid.uuid4().bytes,
"v": 8,
"i": int.from_bytes(msg_id, "big"),
"dtl": self._encode_multiple([self.user.current_handle], self.user.current_handle),
"nr": 1,
"sP": self.user.current_handle,
}
logger.debug(f"Sending activation message : {body}")
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
self.connection.send_message("com.apple.private.alloy.sms", body, msg_id)
#logger.debug(f"Sent activation message : {body}")
return True
def send(self, message: iMessage):
# Set the sender, if it isn't already
if message.sender is None:
message.sender = self.user.handles[0] # TODO : Which handle to use?
message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants
self._cache_keys(message.participants)
self._cache_keys(message.participants, "com.apple.madrid")
# Turn the message into a raw message
raw = message.to_raw()
import base64
bundled_payloads = []
for participant in message.participants:
for push_token in self.USER_CACHE[participant]:
if push_token == self.connection.token:
continue # Don't send to ourselves
identity_keys = ids.identity.IDSIdentity.decode(
self.KEY_CACHE[push_token][0]
)
payload = self._encrypt_sign_payload(identity_keys, raw)
bundled_payloads.append(
{
"tP": participant,
"D": not participant
== message.sender, # TODO: Should this be false sometimes? For self messages?
"sT": self.KEY_CACHE[push_token][1],
"P": payload,
"t": push_token,
}
)
bundled_payloads = self._encode_multiple(message.participants, message.sender, message.to_raw())
msg_id = random.randbytes(4)
body = {