Merge pull request #9 from Dadoum/main

Add basic skeleton to parse attachments
This commit is contained in:
JJTech 2023-07-31 19:54:10 -04:00 committed by GitHub
commit 45e8741f95
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 8 deletions

4
.gitignore vendored
View file

@ -160,4 +160,6 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ .idea/
attachments/

14
demo.py
View file

@ -1,5 +1,6 @@
import json import json
import logging import logging
import os
import threading import threading
import time import time
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
@ -151,8 +152,21 @@ current_effect = None
while True: while True:
msg = im.receive() msg = im.receive()
if msg is not None: if msg is not None:
# print(f'[{msg.sender}] {msg.text}')
print(msg.to_string()) print(msg.to_string())
attachments = msg.attachments()
if len(attachments) > 0:
attachments_path = f"attachments/{msg.id}/"
os.makedirs(attachments_path, exist_ok=True)
for attachment in attachments:
with open(attachments_path + attachment.name, "wb") as attachment_file:
attachment_file.write(attachment.versions[0].data())
print(f"({len(attachments)} attachment{'s have' if len(attachments) != 1 else ' has'} been downloaded and put "
f"in {attachments_path})")
if len(INPUT_QUEUE) > 0: if len(INPUT_QUEUE) > 0:
msg = INPUT_QUEUE.pop() msg = INPUT_QUEUE.pop()
if msg == '': continue if msg == '': continue

View file

@ -4,6 +4,7 @@
## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc ## HAVE ANOTHER FILE TO SETUP EVERYTHING AUTOMATICALLY, etc
# JSON parsing of keys, don't pass around strs?? # JSON parsing of keys, don't pass around strs??
import base64
import gzip import gzip
import logging import logging
import plistlib import plistlib
@ -17,6 +18,8 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, padding from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from xml.etree import ElementTree
import apns import apns
import ids import ids
@ -35,6 +38,93 @@ class BalloonBody:
# TODO : Register handlers based on type id # TODO : Register handlers based on type id
class AttachmentFile:
def data(self) -> bytes:
raise NotImplementedError()
@dataclass
class MMCSFile(AttachmentFile):
url: str | None = None
size: int | None = None
owner: str | None = None
signature: bytes | None = None
decryption_key: bytes | None = None
def data(self) -> bytes:
import requests
logger.info(requests.get(
url=self.url,
headers={
"User-Agent": f"IMTransferAgent/900 CFNetwork/596.2.3 Darwin/12.2.0 (x86_64) (Macmini5,1)",
# "MMCS-Url": self.url,
# "MMCS-Signature": str(base64.encodebytes(self.signature)),
# "MMCS-Owner": self.owner
},
).headers)
return b""
@dataclass
class InlineFile(AttachmentFile):
_data: bytes
def data(self) -> bytes:
return self._data
@dataclass
class Attachment:
name: str
mime_type: str
versions: list[AttachmentFile]
def __init__(self, message_raw_content: dict, xml_element: ElementTree.Element):
attrs = xml_element.attrib
self.name = attrs["name"] if "name" in attrs else None
self.mime_type = attrs["mime-type"] if "mime-type" in attrs else None
if "inline-attachment" in attrs:
# just grab the inline attachment !
self.versions = [InlineFile(message_raw_content[attrs["inline-attachment"]])]
else:
# suffer
versions = []
for attribute in attrs:
if attribute.startswith("mmcs") or \
attribute.startswith("decryption-key") or \
attribute.startswith("file-size"):
segments = attribute.split('-')
if segments[-1].isnumeric():
index = int(segments[-1])
attribute_name = segments[:-1]
else:
index = 0
attribute_name = attribute
while index >= len(versions):
versions.append(MMCSFile())
val = attrs[attribute_name]
match attribute_name:
case "mmcs-url":
versions[index].url = val
case "mmcs-owner":
versions[index].owner = val
case "mmcs-signature-hex":
versions[index].signature = base64.b16decode(val)
case "file-size":
versions[index].size = int(val)
case "decryption-key":
versions[index].decryption_key = base64.b16decode(val)[1:]
self.versions = versions
def __repr__(self):
return f'<Attachment name="{self.name}" type="{self.mime_type}">'
@dataclass @dataclass
class iMessage: class iMessage:
"""Represents an iMessage""" """Represents an iMessage"""
@ -47,7 +137,7 @@ class iMessage:
"""List of participants in the message, including the sender""" """List of participants in the message, including the sender"""
sender: str | None = None sender: str | None = None
"""Sender of the message""" """Sender of the message"""
_id: uuid.UUID | None = None id: uuid.UUID | None = None
"""ID of the message, will be randomly generated if not provided""" """ID of the message, will be randomly generated if not provided"""
group_id: uuid.UUID | None = None group_id: uuid.UUID | None = None
"""Group ID of the message, will be randomly generated if not provided""" """Group ID of the message, will be randomly generated if not provided"""
@ -62,10 +152,16 @@ class iMessage:
_raw: dict | None = None _raw: dict | None = None
"""Internal property representing the original raw message, may be None""" """Internal property representing the original raw message, may be None"""
def attachments(self) -> list[Attachment]:
if self.xml is not None:
return [Attachment(self._raw, elem) for elem in ElementTree.fromstring(self.xml)[0] if elem.tag == "FILE"]
else:
return []
def sanity_check(self): def sanity_check(self):
"""Corrects any missing fields""" """Corrects any missing fields"""
if self._id is None: if self.id is None:
self._id = uuid.uuid4() self.id = uuid.uuid4()
if self.group_id is None: if self.group_id is None:
self.group_id = uuid.uuid4() self.group_id = uuid.uuid4()
@ -103,9 +199,9 @@ class iMessage:
xml=message.get("x"), xml=message.get("x"),
participants=message.get("p", []), participants=message.get("p", []),
sender=sender if sender is not None else message.get("p", [])[-1] if "p" in message else None, sender=sender if sender is not None else message.get("p", [])[-1] if "p" in message else None,
_id=uuid.UUID(message.get("r")) if "r" in message else None, id=uuid.UUID(message.get("r")) if "r" in message else None,
group_id=uuid.UUID(message.get("gid")) if "gid" in message else None, group_id=uuid.UUID(message.get("gid")) if "gid" in message else None,
body=BalloonBody(message["bid"], message["b"]) if "bid" in message else None, body=BalloonBody(message["bid"], message["b"]) if "bid" in message and "b" in message else None,
effect=message["iid"] if "iid" in message else None, effect=message["iid"] if "iid" in message else None,
_compressed=compressed, _compressed=compressed,
_raw=message, _raw=message,
@ -120,7 +216,7 @@ class iMessage:
"t": self.text, "t": self.text,
"x": self.xml, "x": self.xml,
"p": self.participants, "p": self.participants,
"r": str(self._id).upper(), "r": str(self.id).upper(),
"gid": str(self.group_id).upper(), "gid": str(self.group_id).upper(),
"pv": 0, "pv": 0,
"gv": "8", "gv": "8",
@ -428,7 +524,7 @@ class iMessageUser:
"ua": "[macOS,13.4.1,22F82,MacBookPro18,3]", "ua": "[macOS,13.4.1,22F82,MacBookPro18,3]",
"v": 8, "v": 8,
"i": int.from_bytes(msg_id, "big"), "i": int.from_bytes(msg_id, "big"),
"U": message._id.bytes, "U": message.id.bytes,
"dtl": bundled_payloads, "dtl": bundled_payloads,
"sP": message.sender, "sP": message.sender,
} }