- Remove eol whitespace

- Add retry for writing to stream since it's busy sometimes
- add a few more type hints
- add back demo.py tui commands
- add sending with effects
This commit is contained in:
itsjunetime 2023-08-22 11:13:31 -06:00
parent f80acd2e09
commit 2960b7c63c
15 changed files with 166 additions and 84 deletions

View file

@ -46,7 +46,7 @@ def _generate_csr(private_key: rsa.RSAPrivateKey) -> str:
def generate_push_cert() -> tuple[str, str]:
"""
Generates an APNs push certificate by talking to Albert.
Returns [private key PEM, certificate PEM]
"""
private_key = rsa.generate_private_key(
@ -67,7 +67,7 @@ def generate_push_cert() -> tuple[str, str]:
}
logger.debug(f"Generated activation info (with UUID: {activation_info['UniqueDeviceID']})")
activation_info = plistlib.dumps(activation_info)
# Load the private key

30
apns.py
View file

@ -88,9 +88,19 @@ class APNSConnection:
async def _send(self, payload: APNSPayload):
"""Sends a payload to the APNs server"""
await payload.write_to_stream(self.sock)
while True:
try:
await payload.write_to_stream(self.sock)
return
except trio.BusyResourceError:
print("Can't send payload, stream is busy; trying again in 0.2")
await trio.sleep(0.2)
continue
except Exception as e:
print(f"Can't send payload: {e}")
return
async def _receive(self, id: int, filter: Callable | None = None):
async def _receive(self, id: int, filter: Callable[[APNSPayload], bool] | None = None):
"""
Waits for a payload with the given id to be added to the queue, then returns it.
If filter is not None, it will be called with the payload as an argument, and if it returns False,
@ -175,7 +185,7 @@ class APNSConnection:
context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
context.set_alpn_protocols(["apns-security-v3"])
# Turn off certificate verification, for the proxy
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -237,7 +247,7 @@ class APNSConnection:
],
)
if token != b"" and token is not None:
if token:
payload.fields.insert(0, APNSField(0x1, token))
await self._send(payload)
@ -315,7 +325,7 @@ class APNSConnection:
return True
r = await self._receive(0xA, f)
#await self._send_ack(r.fields_with_id(4)[0].value)
# await self._send_ack(r.fields_with_id(4)[0].value)
return r
async def set_state(self, state: int):
@ -342,7 +352,7 @@ class APNSConnection:
APNSField(8, b"\x00"),
],
)
await payload.write_to_stream(self.sock)
await self._send(payload)
@dataclass
@ -375,13 +385,11 @@ class APNSPayload:
@staticmethod
async def read_from_stream(stream: trio.abc.Stream) -> APNSPayload:
"""Reads a payload from the given stream"""
id = await stream.receive_some(1)
if id is None or id == b"":
if not (id_bytes := await stream.receive_some(1)):
raise Exception("Unable to read payload id from stream")
id = int.from_bytes(id, "big")
id: int = int.from_bytes(id_bytes, "big")
length = await stream.receive_some(4)
if length is None:
if (length := await stream.receive_some(4)) is None:
raise Exception("Unable to read payload length from stream")
length = int.from_bytes(length, "big")

View file

@ -10,7 +10,7 @@ def apns_init_bag_old():
if OLD_APNS_BAG is not None:
return OLD_APNS_BAG
r = requests.get("https://init.push.apple.com/bag", verify=False)
if r.status_code != 200:
raise Exception("Failed to get APNs init bag")
@ -32,7 +32,7 @@ def apns_init_bag():
if APNS_BAG is not None:
return APNS_BAG
r = requests.get("http://init-p01st.push.apple.com/bag", verify=False)
if r.status_code != 200:
raise Exception("Failed to get APNs init bag 2")
@ -53,7 +53,7 @@ def ids_bag():
if IDS_BAG is not None:
return IDS_BAG
r = requests.get(
"https://init.ess.apple.com/WebObjects/VCInit.woa/wa/getBag?ix=3", verify=False
)

77
demo.py
View file

@ -104,10 +104,81 @@ async def main():
nursery.start_soon(output_task, im)
async def input_task(im: imessage.iMessageUser):
current_effect: str | None = None
current_participants: list[str] = []
def is_cmd(cmd_str: str, name: str) -> bool:
return cmd_str in [name, name[0]] or cmd_str.startswith(f"{name} ") or cmd_str.startswith(f"{name[0]} ")
def get_parameters(cmd: str, err_msg: str) -> list[str] | None:
sections: list[str] = cmd.split(" ")
if len(sections) < 2 or len(sections[1]) == 0:
print(err_msg)
return None
else:
return sections[1:]
def fixup_handle(handle: str) -> str:
if handle.startswith("tel:+") or handle.startswith("mailto:"):
return handle
elif handle.startswith("tel:"):
return "tel:+" + handle[4:]
elif handle.startswith("+"):
return "tel:" + handle
elif handle[0].isdigit():
# if the handle is 10 digits, assume it's a US number
if len(handle) == 10:
return "tel:+1" + handle
# otherwise just assume it's a full phone number
return "tel:+" + handle
else: # assume it's an email
return "mailto:" + handle
while True:
cmd = await trio.to_thread.run_sync(input, "> ", cancellable=True)
if cmd != "":
await im.send(imessage.iMessage.create(im, cmd, [im.user.current_handle]))
if (cmd := await trio.to_thread.run_sync(input, ">> ", cancellable=True)) == "":
continue
if is_cmd(cmd, "help"):
print("help (h): show this message")
print("quit (q): quit")
print("filter (f) [recipient]: set the current chat")
print("note: recipient must start with tel: or mailto: and include the country code")
print("effect (e): adds an iMessage effect to the next sent message")
print("handle [handle]: set the current handle (for sending messages with)")
print("\\: escape commands (will be removed from message)")
print("all other commands will be treated as message text to be sent")
elif is_cmd(cmd, "quit"):
exit(0)
elif is_cmd(cmd, "effect"):
if (effect := get_parameters(cmd, "effect [effect namespace]")) is not None:
print(f"next message will be sent with [{effect[0]}]")
current_effect = effect[0]
elif is_cmd(cmd, "filter"):
# set the current chat
if (participants := get_parameters(cmd, "filter [recipients]")) is not None:
fixed_participants: list[str] = list(map(fixup_handle, participants))
print(f"Filtering to {fixed_participants}")
current_participants = fixed_participants
elif is_cmd(cmd, "handle"):
handles: list[str] = im.user.handles
av_handles: str = "\n".join([f"\t{h}{' (current)' if h == im.user.current_handle else ''}" for h in handles])
err_str: str = f"handle [handle]\nAvailable handles:\n{av_handles}"
if (input_handles := get_parameters(cmd, err_str)) is not None:
handle = fixup_handle(input_handles[0])
if handle in handles:
print(f"Using {handle} as handle")
im.user.current_handle = handle
else:
print(f"Handle {handle} not found")
elif len(current_participants) > 0:
if cmd.startswith("\\"):
cmd = cmd[1:]
await im.send(imessage.iMessage.create(im, cmd, current_participants, current_effect))
current_effect = None
else:
print("No chat selected")
async def output_task(im: imessage.iMessageUser):
while True:

View file

@ -27,7 +27,7 @@ class VirtualInstructions:
value = int.from_bytes(self.uc.mem_read(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), 8), byteorder='little')
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) + 8)
return value
def _set_args(self, args: list[int]):
for i in range(len(args)):
if i < 6:
@ -35,7 +35,6 @@ class VirtualInstructions:
else:
self.push(args[i])
def call(self, address: int, args: list[int] = []):
logger.debug(f"Calling {hex(address)} with args {args}")
self.push(STOP_ADDRESS)
@ -43,7 +42,6 @@ class VirtualInstructions:
self.uc.emu_start(address, STOP_ADDRESS)
return self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX)
class Jelly:
# Constants
UC_ARCH = unicorn.UC_ARCH_X86
@ -69,7 +67,7 @@ class Jelly:
instr: VirtualInstructions = None
uc: unicorn.Uc = None
# Private variables
_binary: bytes = b""
@ -92,10 +90,10 @@ class Jelly:
def _setup_unicorn(self):
self.uc = unicorn.Uc(self.UC_ARCH, self.UC_MODE)
def _setup_stack(self):
def _setup_stack(self):
self.uc.mem_map(self.STACK_BASE, self.STACK_SIZE)
self.uc.mem_write(self.STACK_BASE, b"\x00" * self.STACK_SIZE)
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_BASE + self.STACK_SIZE)
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_BASE + self.STACK_SIZE)
@ -156,12 +154,12 @@ class Jelly:
if addr == address:
logger.debug(f"{name}: ")
self._hooks[name](self)
def _setup_hooks(self):
# Wrap all hooks
for name, func in self._hooks.items():
self._hooks[name] = self.wrap_hook(func)
self.uc.mem_map(self.HOOK_BASE, self.HOOK_SIZE)
# Write 'ret' instruction to all hook addresses
self.uc.mem_write(self.HOOK_BASE, b"\xc3" * self.HOOK_SIZE)
@ -184,7 +182,7 @@ class Jelly:
# Parse the binary so we can process binds
p = macholibre.Parser(self._binary)
p.parse()
for seg in p.segments:
for section in seg['sects']:
if section['type'] == 'LAZY_SYMBOL_POINTERS' or section['type'] == 'NON_LAZY_SYMBOL_POINTERS':
@ -202,11 +200,11 @@ class Jelly:
pass
else:
raise NotImplementedError(f"Unknown bind type {type}")
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
logger.debug(f"Doing binds for {section['name']}")
for i in range(0, int(section['size']/8)):
# Parse into proper list?
for i in range(0, int(section['size']/8)):
# Parse into proper list?
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
dysym = int.from_bytes(dysym, 'little')
index = dysym & 0x3fffffff
@ -218,7 +216,7 @@ class Jelly:
name = c_string(strtab, strx) # Remove _ at beginning
#print(f"Lazy bind for {hex(section['offset'] + (i * 8))} : {name}")
self._do_bind(mu, 1, section['offset'] + (i * 8), name)
def _parse_binds(self, mu: unicorn.Uc, binds: bytes, segments):
blen = len(binds)
binds: BytesIO = BytesIO(binds)
@ -241,7 +239,7 @@ class Jelly:
logger.debug("BIND_OPCODE_DONE")
break
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
ordinal = immediate
ordinal = immediate
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB:
#ordinal = uLEB128(&p);
ordinal = decodeULEB128(binds)
@ -345,7 +343,7 @@ def decodeULEB128(bytes: BytesIO) -> int:
def c_string(bytes, start: int = 0) -> str:
out = ''
i = start
while True:
if i > len(bytes) or bytes[i] == 0:
break

View file

@ -808,7 +808,7 @@ class Parser():
if cmd == 'SEGMENT' or cmd == 'SEGMENT_64':
#self.segments.append((offset, size, cmd, cmd_size))
#self.__macho['lcs'].append(
parsed = self.parse_segment(offset, size, cmd, cmd_size)
self.__macho['lcs'].append(parsed)
self.segments.append(parsed)

View file

@ -67,7 +67,7 @@ def nac_init(j: Jelly, cert: bytes):
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_init: {n}")
# Get the outputs
validation_ctx_addr = j.uc.mem_read(out_validation_ctx_addr, 8)
request_bytes_addr = j.uc.mem_read(out_request_bytes_addr, 8)
@ -79,7 +79,7 @@ def nac_init(j: Jelly, cert: bytes):
logger.debug(f"Request @ {hex(request_bytes_addr)} : {hex(request_len)}")
request = j.uc.mem_read(request_bytes_addr, request_len)
validation_ctx_addr = int.from_bytes(validation_ctx_addr, 'little')
return validation_ctx_addr, request
@ -100,11 +100,11 @@ def nac_key_establishment(j: Jelly, validation_ctx: int, response: bytes):
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_submit: {n}")
def nac_sign(j: Jelly, validation_ctx: int):
#void *validation_ctx, void *unk_bytes, int unk_len,
# void **validation_data, int *validation_data_len
out_validation_data_addr = j.malloc(8)
out_validation_data_len_addr = j.malloc(8)
@ -123,7 +123,7 @@ def nac_sign(j: Jelly, validation_ctx: int):
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_generate: {n}")
validation_data_addr = j.uc.mem_read(out_validation_data_addr, 8)
validation_data_len = j.uc.mem_read(out_validation_data_len_addr, 8)
@ -197,7 +197,7 @@ def IORegistryEntryCreateCFProperty(j: Jelly, entry: int, key: int, allocator: i
else:
logger.debug(f"IOKit Entry: {key_str} -> None")
return 0
def CFGetTypeID(j: Jelly, obj: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
@ -206,14 +206,14 @@ def CFGetTypeID(j: Jelly, obj: int):
return 2
else:
raise Exception("Unknown CF object type")
def CFDataGetLength(j: Jelly, obj: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
return len(obj)
else:
raise Exception("Unknown CF object type")
def CFDataGetBytes(j: Jelly, obj: int, range_start: int, range_end: int, buf: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
@ -223,7 +223,7 @@ def CFDataGetBytes(j: Jelly, obj: int, range_start: int, range_end: int, buf: in
return len(data)
else:
raise Exception("Unknown CF object type")
def CFDictionaryCreateMutable(j: Jelly) -> int:
CF_OBJECTS.append({})
return len(CF_OBJECTS)
@ -257,7 +257,7 @@ def CFDictionaryGetValue(j: Jelly, d: int, key: int) -> int:
return 0
else:
raise Exception("Unknown CF object type")
def CFDictionarySetValue(j: Jelly, d: int, key: int, val: int):
d = CF_OBJECTS[d - 1]
key = maybe_object_maybe_string(j, key)
@ -270,7 +270,7 @@ def CFDictionarySetValue(j: Jelly, d: int, key: int, val: int):
def DADiskCopyDescription(j: Jelly) -> int:
description = CFDictionaryCreateMutable(j)
CFDictionarySetValue(j, description, "DADiskDescriptionVolumeUUIDKey", FAKE_DATA["root_disk_uuid"])
return description
return description
def CFStringCreate(j: Jelly, string: str) -> int:
CF_OBJECTS.append(string)
@ -292,7 +292,7 @@ def CFStringGetCString(j: Jelly, string: int, buf: int, buf_len: int, encoding:
return len(data)
else:
raise Exception("Unknown CF object type")
def IOServiceMatching(j: Jelly, name: int) -> int:
# Read the raw c string pointed to by name
name = _parse_cstr_ptr(j, name)
@ -305,7 +305,7 @@ def IOServiceMatching(j: Jelly, name: int) -> int:
CFDictionarySetValue(j, d, "IOProviderClass", name)
# Return the dictionary
return d
def IOServiceGetMatchingService(j: Jelly) -> int:
return 92
@ -324,7 +324,7 @@ def IOIteratorNext(j: Jelly, iterator: int) -> int:
return 94
else:
return 0
def bzero(j: Jelly, ptr: int, len: int):
j.uc.mem_write(ptr, bytes([0]) * len)
return 0
@ -419,4 +419,4 @@ if __name__ == "__main__":
from base64 import b64encode
val_data = generate_validation_data()
logger.info(f"Validation Data: {b64encode(val_data).decode()}")
#main()
#main()

View file

@ -54,12 +54,10 @@ class IDSUser:
# Uses an existing authentication keypair
def restore_authentication(
self, auth_keypair: _helpers.KeyPair, user_id: str, handles: dict
):
def restore_authentication(self, auth_keypair: _helpers.KeyPair, user_id: str, handles: list[str]):
self._auth_keypair = auth_keypair
self.user_id = user_id
self.handles = handles
self.handles = handles
self.current_handle = self.handles[0]
# This is a separate call so that the user can make sure the first part succeeds before asking for validation data
@ -126,4 +124,4 @@ class IDSUser:
async def lookup(self, uris: list[str], topic: str = "com.apple.madrid") -> Any:
return await query.lookup(self.push_connection, self.current_handle, self._id_keypair, uris, topic)

View file

@ -36,4 +36,3 @@ def serialize_key(key) -> str:
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8").strip()

View file

@ -33,7 +33,7 @@ class IDSIdentity:
# Generate a new key
self.signing_key = serialize_key(ec.generate_private_key(ec.SECP256R1()))
self.signing_public_key = serialize_key(parse_key(self.signing_key).public_key())# type: ignore
if encryption_key is not None:
self.encryption_key = encryption_key
self.encryption_public_key = serialize_key(parse_key(encryption_key).public_key())# type: ignore
@ -43,7 +43,7 @@ class IDSIdentity:
else:
self.encryption_key = serialize_key(rsa.generate_private_key(65537, 1280))
self.encryption_public_key = serialize_key(parse_key(self.encryption_key).public_key())# type: ignore
@classmethod
def decode(cls, inp: bytes) -> Self:
input = BytesIO(inp)
@ -96,7 +96,7 @@ class IDSIdentity:
output.write(raw_rsa.getvalue())
return output.getvalue()
def register(
push_token, handles, user_id, auth_key: KeyPair, push_key: KeyPair, identity: IDSIdentity, validation_data
):

View file

@ -60,7 +60,7 @@ def get_auth_token(
result = _auth_token_request(username, password)
if result["status"] != 0:
raise Exception(f"Error: {result}")
auth_token = result["auth-token"]
realm_user_id = result["profile-id"]
# else:
@ -146,7 +146,7 @@ def get_auth_cert(user_id, token) -> KeyPair:
)
def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair):
def get_handles(push_token, user_id: str, auth_key: KeyPair, push_key: KeyPair) -> list[str]:
BAG_KEY = "id-get-handles"
headers = {

View file

@ -48,7 +48,7 @@ async def lookup(
}
await conn.send_notification(topic, plistlib.dumps(req, fmt=plistlib.FMT_BINARY))
def check(payload: apns.APNSPayload):
body = payload.fields_with_id(3)[0].value
if body is None:

View file

@ -143,18 +143,18 @@ class Message:
"""Internal property representing whether the message should be compressed"""
xml: str | None = None
"""XML portion of message, may be None"""
@staticmethod
def from_raw(message: bytes, sender: str | None = None) -> "Message":
"""Create a `Message` from raw message bytes"""
raise NotImplementedError()
def to_raw(self) -> bytes:
"""Convert a `Message` to raw message bytes"""
raise NotImplementedError()
def __str__(self):
raise NotImplementedError()
@ -186,8 +186,8 @@ class SMSReflectedMessage(Message):
def to_raw(self) -> bytes:
# {'re': [{'id': '+14155086773', 'uID': '4155086773', 'n': 'us'}], 'ic': 0, 'mD': {'handle': '+14155086773', 'guid': imessage.py:201
# '35694E24-E265-4D5C-8CA7-9499E35D0402', 'replyToGUID': '4F9BC76B-B09C-2A60-B312-9029D529706B', 'plain-body': 'Test sms', 'service':
# 'SMS', 'sV': '1'}, 'fR': True, 'chat-style': 'im'}
# '35694E24-E265-4D5C-8CA7-9499E35D0402', 'replyToGUID': '4F9BC76B-B09C-2A60-B312-9029D529706B', 'plain-body': 'Test sms', 'service':
# 'SMS', 'sV': '1'}, 'fR': True, 'chat-style': 'im'}
#pass
# Strip tel: from participants, making sure they are all phone numbers
#participants = [p.replace("tel:", "") for p in self.participants]
@ -250,7 +250,7 @@ class SMSIncomingMessage(Message):
def __str__(self):
return f"[SMS {self.sender}] '{self.text}'"
@dataclass
class SMSIncomingImage(Message):
@staticmethod
@ -262,10 +262,17 @@ class SMSIncomingImage(Message):
@dataclass
class iMessage(Message):
"""
An iMessage
Description of payload keys:
t: The sender token. Normally just a big thing of data/bytes, doesn't need to be decoded in any way
U: the id of the message (uuid) as bytes
"""
effect: str | None = None
@staticmethod
def create(user: "iMessageUser", text: str, participants: list[str]) -> "iMessage":
def create(user: "iMessageUser", text: str, participants: list[str], effect: str | None) -> "iMessage":
"""Creates a basic outgoing `iMessage` from the given text and participants"""
sender = user.user.current_handle
@ -277,8 +284,9 @@ class iMessage(Message):
sender=sender,
participants=participants,
id=uuid.uuid4(),
effect=effect
)
@staticmethod
def from_raw(message: bytes, sender: str | None = None) -> "iMessage":
"""Create a `iMessage` from raw message bytes"""
@ -304,7 +312,7 @@ class iMessage(Message):
_compressed=compressed,
effect=message["iid"] if "iid" in message else None, # type: ignore
)
def to_raw(self) -> bytes:
"""Convert an `iMessage` to raw message bytes"""
@ -330,7 +338,7 @@ class iMessage(Message):
d = gzip.compress(d, mtime=0)
return d
def __str__(self):
return f"[iMessage {self.sender}] '{self.text}'"
@ -506,7 +514,7 @@ class iMessageUser:
Will return the next iMessage in the queue, or None if there are no messages
"""
body: dict[str, Any] = await self._receive_raw(list(MESSAGE_TYPES.keys()), [t[0] for t in MESSAGE_TYPES.values()])
t: type[Message] = MESSAGE_TYPES[body["c"]][1]
t: type[Message] = MESSAGE_TYPES[body["c"]][1]
if not await self._verify_payload(body["P"], body["sP"], body["t"]):
raise Exception("Failed to verify payload")
@ -642,7 +650,7 @@ class iMessageUser:
elif isinstance(c, list) and body["c"] not in c:
return False
return True
payload = await self.connection.expect_notification(topics, check)
body_bytes: bytes = payload.fields_with_id(3)[0].value
@ -657,7 +665,7 @@ class iMessageUser:
"""
act_message: dict[str, Any] = await self._receive_raw(145, "com.apple.private.alloy.sms")
logger.info(f"Received SMS activation message : {act_message}")
# Decrypt the payload
act_message_bytes: bytes = self._decrypt_payload(act_message["P"])
@ -667,7 +675,7 @@ class iMessageUser:
logger.info("SMS forwarding activated, sending response")
else:
logger.info("SMS forwarding de-activated, sending response")
await self._send_raw(
147,
[self.user.current_handle],
@ -684,7 +692,7 @@ class iMessageUser:
break
else:
raise Exception("Unknown message type")
send_to = message.participants if isinstance(message, iMessage) else [self.user.current_handle]
await self._cache_keys(send_to, topic)
@ -698,7 +706,7 @@ class iMessageUser:
{
"E": "pair", # TODO: Do we need the nr field for SMS?
}
)
)
# Check for delivery
count = 0

View file

@ -33,7 +33,7 @@ async def main():
# Set the certificate and private key
parent_dir: str = os.path.dirname(os.path.realpath(__file__))
context.load_cert_chain(os.path.join(parent_dir, "push_certificate_chain.pem"), os.path.join(parent_dir, "push_key.pem"))
await trio.serve_ssl_over_tcp(handle_proxy, 5223, context)
async def handle_proxy(stream: trio.SocketStream):
@ -67,7 +67,7 @@ class APNSProxy:
logging.error("Unable to start proxy, trying again...")
await trio.sleep(1)
async def proxy(self, to_server: bool):
if to_server:
@ -89,7 +89,7 @@ class APNSProxy:
# logging.info(f"-> {payload}")
# else:
# logging.info(f"<- {payload}")
def tamper(self, payload: apns.APNSPayload, to_server) -> apns.APNSPayload:
#if not to_server:
# payload = self.tamper_lookup_keys(payload)
@ -110,7 +110,7 @@ class APNSProxy:
for identity in result["identities"]:
if "client-data" in identity:
identity["client-data"]["public-message-identity-key"] = b"REDACTED"
resp = gzip.compress(plistlib.dumps(resp, fmt=plistlib.FMT_BINARY), mtime=0)
body["b"] = resp
body = plistlib.dumps(body, fmt=plistlib.FMT_BINARY)

View file

@ -41,7 +41,7 @@ python3 "${proxy_dir}/hosts.py" >> "$hosts_proxy"
echo -e "\e[32m[?]\e[0;1m ${hosts_proxy}\e[0m must be copied over to /etc/hosts. Would you like us to do that for you? [y/n]"
read -rn1 answer
if [[ "${answer,,}" == "y" ]]
if [[ "${answer,,}" == "y" ]]
then
inf "Backing up /etc/hosts to /etc/hosts.bak and copying ${hosts_proxy} to /etc/hosts"
sudo cp /etc/hosts /etc/hosts.bak
@ -96,4 +96,4 @@ inf "Restarting wifi to force apsd to reconnect..."
networksetup -setairportpower en0 off
networksetup -setairportpower en0 on
while true; do read -rn1 _; done
while true; do read -rn1 _; done