refactoring and cleanup

This commit is contained in:
JJTech0130 2023-08-13 23:24:34 -04:00
parent 04b139f941
commit e41ed2c6a2
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
2 changed files with 148 additions and 169 deletions

View file

@ -153,8 +153,7 @@ def fixup_handle(handle):
current_participants = []
current_effect = None
while True:
if not im._received_activation_message:
im._activate_sms()
im.activate_sms() # We must call this always since SMS could be turned off and on again, and it might have been on before this.
msg = im.receive()
if msg is not None:
# print(f'[{msg.sender}] {msg.text}')
@ -201,6 +200,7 @@ while True:
else:
print(f'Filtering to {[fixup_handle(h) for h in msg[1:]]}')
current_participants = [fixup_handle(h) for h in msg[1:]]
im._cache_keys(current_participants, "com.apple.madrid") # Just to make things faster, and to make it error on invalid addresses
elif msg == 'handle' or msg.startswith('handle '):
msg = msg.split(' ')
if len(msg) < 2 or msg[1] == '':

View file

@ -53,7 +53,9 @@ class MMCSFile(AttachmentFile):
def data(self) -> bytes:
import requests
logger.info(requests.get(
logger.info(
requests.get(
url=self.url,
headers={
"User-Agent": f"IMTransferAgent/900 CFNetwork/596.2.3 Darwin/12.2.0 (x86_64) (Macmini5,1)",
@ -61,7 +63,8 @@ class MMCSFile(AttachmentFile):
# "MMCS-Signature": str(base64.encodebytes(self.signature)),
# "MMCS-Owner": self.owner
},
).headers)
).headers
)
return b""
@ -87,11 +90,12 @@ class Attachment:
if "inline-attachment" in attrib:
# just grab the inline attachment !
self.versions = [InlineFile(message_raw_content[attrib["inline-attachment"]])]
self.versions = [
InlineFile(message_raw_content[attrib["inline-attachment"]])
]
else:
# suffer
versions = [InlineFile(b"")]
print(attrib)
@ -158,7 +162,11 @@ class iMessage:
def attachments(self) -> list[Attachment]:
if self.xml is not None:
return [Attachment(self._raw, elem) for elem in ElementTree.fromstring(self.xml)[0] if elem.tag == "FILE"]
return [
Attachment(self._raw, elem)
for elem in ElementTree.fromstring(self.xml)[0]
if elem.tag == "FILE"
]
else:
return []
@ -202,20 +210,29 @@ class iMessage:
try:
return iMessage(
text=message["t"], # Cause it to "fail to parse" if there isn't any good text to display, temp hack
text=message[
"t"
], # Cause it to "fail to parse" if there isn't any good text to display, temp hack
xml=message.get("x"),
participants=message.get("p", []),
sender=sender if sender is not None else message.get("p", [])[-1] if "p" in message else None,
sender=sender
if sender is not None
else message.get("p", [])[-1]
if "p" in message
else None,
id=uuid.UUID(message.get("r")) if "r" in message else None,
group_id=uuid.UUID(message.get("gid")) if "gid" in message else None,
body=BalloonBody(message["bid"], message["b"]) if "bid" in message and "b" in message else None,
body=BalloonBody(message["bid"], message["b"])
if "bid" in message and "b" in message
else None,
effect=message["iid"] if "iid" in message else None,
_compressed=compressed,
_raw=message,
)
except:
import json
dmp = json.dumps(message, indent=4)
#import json
dmp = str(message)
return iMessage(text=f"failed to parse: {dmp}", _raw=message)
def to_raw(self) -> bytes:
@ -232,7 +249,7 @@ class iMessage:
"pv": 0,
"gv": "8",
"v": "1",
"iid": self.effect
"iid": self.effect,
}
# Remove keys that are None
@ -262,39 +279,11 @@ class iMessageUser:
self.connection = connection
self.user = user
def _get_raw_message(self):
"""
Returns a raw APNs message corresponding to the next conforming notification in the queue
Returns None if no conforming notification is found
"""
def check_response(x):
if x[0] != 0x0A:
return False
if apns._get_field(x[1], 2) != sha1("com.apple.madrid".encode()).digest() and apns._get_field(x[1], 2) != sha1("com.apple.private.alloy.sms".encode()).digest():
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
# logger.debug("Rejecting madrid message with no body")
return False
resp_body = plistlib.loads(resp_body)
if "P" not in resp_body:
# logger.debug(f"Rejecting madrid message with no payload : {resp_body}")
return False
return True
payload = self.connection.incoming_queue.pop_find(check_response)
if payload is None:
return None
id = apns._get_field(payload[1], 4)
return payload
def _parse_payload(payload: bytes) -> tuple[bytes, bytes]:
payload = BytesIO(payload)
tag = payload.read(1)
#print("TAG", tag)
# print("TAG", tag)
body_length = int.from_bytes(payload.read(2), "big")
body = payload.read(body_length)
@ -415,7 +404,9 @@ class iMessageUser:
logger.warning("Unable to find the public key of the sender, cannot verify")
return False
identity_keys = ids.identity.IDSIdentity.decode(self.KEY_CACHE[sender_token]["com.apple.madrid"][0])
identity_keys = ids.identity.IDSIdentity.decode(
self.KEY_CACHE[sender_token]["com.apple.madrid"][0]
)
sender_ec_key = ids._helpers.parse_key(identity_keys.signing_public_key)
payload = iMessageUser._parse_payload(payload)
@ -435,24 +426,22 @@ class iMessageUser:
"""
Will return the next iMessage in the queue, or None if there are no messages
"""
raw = self._get_raw_message()
if raw is None:
body = self._receive_raw(100, "com.apple.madrid")
if body is None:
return None
body = apns._get_field(raw[1], 3)
body = plistlib.loads(body)
#print(f"Got body message {body}")
payload = body["P"]
if not self._verify_payload(payload, body['sP'], body["t"]):
if not self._verify_payload(payload, body["sP"], body["t"]):
raise Exception("Failed to verify payload")
logger.debug(f"Encrypted body : {body}")
decrypted = self._decrypt_payload(payload)
#logger.debug(f"Decrypted payload : {plistlib.loads(decrypted)}")
# logger.debug(f"Decrypted payload : {plistlib.loads(decrypted)}")
return iMessage.from_raw(decrypted, body['sP'])
return iMessage.from_raw(decrypted, body["sP"])
KEY_CACHE_HANDLE: str = ""
KEY_CACHE: dict[bytes, dict[str, tuple[bytes, bytes]]] = {}
@ -474,6 +463,11 @@ class iMessageUser:
# Look up the public keys for the participants, and cache a token : public key mapping
lookup = self.user.lookup(participants, topic=topic)
logger.debug(f"Lookup response : {lookup}")
for key, participant in lookup.items():
if len(participant["identities"]) == 0:
logger.warning(f"Participant {key} has no identities, this is probably not a real account")
for key, participant in lookup.items():
if not key in self.USER_CACHE:
self.USER_CACHE[key] = []
@ -500,11 +494,18 @@ class iMessageUser:
identity["session-token"],
)
def _encode_multiple(self, participants: list[str], sender: str, topic="com.apple.madrid", to_encrypt: bytes | None = None,) -> list[dict]:
def _send_raw(
self,
type: int,
participants: list[str],
topic: str,
payload: bytes | None = None,
id: uuid.UUID | None = None,
extra: dict = {},
):
self._cache_keys(participants, topic)
out = []
dtl = []
for participant in participants:
for push_token in self.USER_CACHE[participant]:
if push_token == self.connection.token:
@ -514,94 +515,83 @@ class iMessageUser:
self.KEY_CACHE[push_token][topic][0]
)
if to_encrypt != None:
payload = self._encrypt_sign_payload(identity_keys, to_encrypt)
else:
payload = None
p = {
"tP": participant,
"D": not participant == sender,
"D": not participant == self.user.current_handle,
"sT": self.KEY_CACHE[push_token][topic][1],
"t": push_token,
}
if payload is not None:
p["P"] = payload
p["P"] = self._encrypt_sign_payload(identity_keys, payload)
logger.debug(f"Encoded payload : {p}")
out.append(p)
dtl.append(p)
return out
message_id = random.randbytes(4)
_received_activation_message: bool = False
if id is None:
id = uuid.uuid4()
def _activate_sms(self) -> bool:
# Check if we have received an SMS forwarding activation message
if not self._received_activation_message:
# Check if it is in the queue
def check(x):
body = {
"c": type,
"fcn": 1,
"v": 8,
"i": int.from_bytes(message_id, "big"),
"U": id.bytes,
"dtl": dtl,
"sP": self.user.current_handle,
}
body.update(extra)
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
self.connection.send_message(topic, body, message_id)
def _receive_raw(self, type: int, topic: str) -> dict | None:
def check_response(x):
if x[0] != 0x0A:
return False
if apns._get_field(x[1], 2) != sha1("com.apple.private.alloy.sms".encode()).digest():
if apns._get_field(x[1], 2) != sha1(topic.encode()).digest():
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
if "c" not in resp_body or resp_body["c"] != 145:
if "c" not in resp_body or resp_body["c"] != type:
return False
return True
payload = self.connection.incoming_queue.pop_find(check)
payload = self.connection.incoming_queue.pop_find(check_response)
if payload is None:
return None
body = apns._get_field(payload[1], 3)
body = plistlib.loads(body)
return body
_received_activation_message: bool = False
def activate_sms(self) -> bool:
"""
Try to activate SMS forwarding
Returns True if we are able to perform SMS forwarding, False otherwise
Call repeatedly until it returns True
"""
act_message = self._receive_raw(145, "com.apple.private.alloy.sms")
if act_message is None:
return False
payload = apns._get_field(payload[1], 3)
payload = plistlib.loads(payload)
dec = self._decrypt_payload(payload['P'])
# Try gzip decompression
try:
dec = gzip.decompress(dec)
except:
pass
dec = plistlib.loads(dec)
logger.debug(f"Think we got an SMS forwarding payload : {payload}")
logger.debug(f"Decrypted : {dec}")
self._received_activation_message = True
# Send out the activation message
msg_id = random.randbytes(4)
body = {
"fcn": 1,
"c": 147,
"ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
"U": uuid.uuid4().bytes,
"v": 8,
"i": int.from_bytes(msg_id, "big"),
"dtl": self._encode_multiple([self.user.current_handle], self.user.current_handle),
"nr": 1,
"sP": self.user.current_handle,
self._send_raw(
147,
[self.user.current_handle],
"com.apple.private.alloy.sms",
extra={
"nr": 1
}
logger.debug(f"Sending activation message : {body}")
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
self.connection.send_message("com.apple.private.alloy.sms", body, msg_id)
#logger.debug(f"Sent activation message : {body}")
return True
)
def send(self, message: iMessage):
# Set the sender, if it isn't already
@ -611,51 +601,40 @@ class iMessageUser:
message.sanity_check() # Sanity check MUST be called before caching keys, so that the sender is added to the list of participants
self._cache_keys(message.participants, "com.apple.madrid")
bundled_payloads = self._encode_multiple(message.participants, message.sender, message.to_raw())
msg_id = random.randbytes(4)
body = {
"fcn": 1,
"c": 100,
self._send_raw(
100,
message.participants,
"com.apple.madrid",
message.to_raw(),
message.id,
{
"E": "pair",
"ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
"v": 8,
"i": int.from_bytes(msg_id, "big"),
"U": message.id.bytes,
"dtl": bundled_payloads,
"sP": message.sender,
}
)
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)
# Check for delivery
count = 0
total = 0
self.connection.send_message("com.apple.madrid", body, msg_id)
import time
start = time.time()
# This code can check to make sure we got a success response, but waiting for the response is annoying,
# so for now we just YOLO it and assume it worked
for p in message.participants:
for t in self.USER_CACHE[p]:
if t == self.connection.token:
continue
total += 1
# def check_response(x):
# if x[0] != 0x0A:
# return False
# if apns._get_field(x[1], 2) != sha1("com.apple.madrid".encode()).digest():
# return False
# resp_body = apns._get_field(x[1], 3)
# if resp_body is None:
# return False
# resp_body = plistlib.loads(resp_body)
# if "c" not in resp_body or resp_body["c"] != 255:
# return False
# return True
while count < total and time.time() - start < 2:
resp = self._receive_raw(255, "com.apple.madrid")
if resp is None:
continue
count += 1
logger.debug(f"Received response : {resp}")
# num_recv = 0
# while True:
# if num_recv == len(bundled_payloads):
# break
# payload = self.connection.incoming_queue.wait_pop_find(check_response)
# if payload is None:
# continue
if resp["s"] != 0:
logger.warning(f"Message delivery to {base64.b64encode(resp['t']).decode()} failed : {resp['s']}")
# resp_body = apns._get_field(payload[1], 3)
# resp_body = plistlib.loads(resp_body)
# logger.error(resp_body)
# num_recv += 1
if count < total:
logger.error(f"Unable to deliver message to all devices (got {count} of {total})")