mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-25 20:32:29 -06:00
f80acd2e09
- extract idsuser authentication functionality to separate function so that it can be reused - add more typing hints to make lsp happier - access dictionary values more safely with walrus operator - simplify some list comprehension and iteration - print proxy errors in more detail so they're easier to debug - store apnsconnection in proxy so that we can use it to make a user and decrypt payloads if needed
356 lines
14 KiB
Python
356 lines
14 KiB
Python
from io import BytesIO
|
|
import unicorn
|
|
from . import mparser as macholibre
|
|
import logging
|
|
logger = logging.getLogger("jelly")
|
|
|
|
STOP_ADDRESS = 0x00900000 # Used as a return address when calling functions
|
|
|
|
ARG_REGISTERS = [
|
|
unicorn.x86_const.UC_X86_REG_RDI,
|
|
unicorn.x86_const.UC_X86_REG_RSI,
|
|
unicorn.x86_const.UC_X86_REG_RDX,
|
|
unicorn.x86_const.UC_X86_REG_RCX,
|
|
unicorn.x86_const.UC_X86_REG_R8,
|
|
unicorn.x86_const.UC_X86_REG_R9
|
|
]
|
|
|
|
class VirtualInstructions:
|
|
def __init__(self, uc: unicorn.Uc):
|
|
self.uc = uc
|
|
|
|
def push(self, value: int):
|
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) - 8)
|
|
self.uc.mem_write(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), value.to_bytes(8, byteorder='little'))
|
|
|
|
def pop(self) -> int:
|
|
value = int.from_bytes(self.uc.mem_read(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), 8), byteorder='little')
|
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) + 8)
|
|
return value
|
|
|
|
def _set_args(self, args: list[int]):
|
|
for i in range(len(args)):
|
|
if i < 6:
|
|
self.uc.reg_write(ARG_REGISTERS[i], args[i])
|
|
else:
|
|
self.push(args[i])
|
|
|
|
|
|
def call(self, address: int, args: list[int] = []):
|
|
logger.debug(f"Calling {hex(address)} with args {args}")
|
|
self.push(STOP_ADDRESS)
|
|
self._set_args(args)
|
|
self.uc.emu_start(address, STOP_ADDRESS)
|
|
return self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX)
|
|
|
|
|
|
class Jelly:
|
|
# Constants
|
|
UC_ARCH = unicorn.UC_ARCH_X86
|
|
UC_MODE = unicorn.UC_MODE_64
|
|
|
|
BINARY_BASE = 0x0
|
|
|
|
HOOK_BASE = 0xD00000
|
|
HOOK_SIZE = 0x1000
|
|
|
|
STACK_BASE = 0x00300000
|
|
STACK_SIZE = 0x00100000
|
|
|
|
HEAP_BASE = 0x00400000
|
|
HEAP_SIZE = 0x00100000
|
|
|
|
STOP_ADDRESS = 0x00900000
|
|
|
|
# Public variables
|
|
_hooks: dict[str, callable] = {}
|
|
"""Symbol name to hook function mapping"""
|
|
|
|
instr: VirtualInstructions = None
|
|
|
|
uc: unicorn.Uc = None
|
|
|
|
# Private variables
|
|
_binary: bytes = b""
|
|
|
|
_heap_use: int = 0
|
|
|
|
def __init__(self, binary: bytes):
|
|
self._binary = binary
|
|
|
|
def setup(self, hooks: dict[str, callable] = {}):
|
|
self._hooks = hooks
|
|
self._setup_unicorn()
|
|
self.instr = VirtualInstructions(self.uc)
|
|
self._setup_hooks()
|
|
self._map_binary()
|
|
self._setup_stack()
|
|
self._setup_heap()
|
|
self._setup_stop()
|
|
|
|
|
|
def _setup_unicorn(self):
|
|
self.uc = unicorn.Uc(self.UC_ARCH, self.UC_MODE)
|
|
|
|
def _setup_stack(self):
|
|
self.uc.mem_map(self.STACK_BASE, self.STACK_SIZE)
|
|
self.uc.mem_write(self.STACK_BASE, b"\x00" * self.STACK_SIZE)
|
|
|
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_BASE + self.STACK_SIZE)
|
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_BASE + self.STACK_SIZE)
|
|
|
|
def _setup_heap(self):
|
|
self.uc.mem_map(self.HEAP_BASE, self.HEAP_SIZE)
|
|
self.uc.mem_write(self.HEAP_BASE, b"\x00" * self.HEAP_SIZE)
|
|
|
|
def debug_registers(self):
|
|
logger.debug(f"""
|
|
RAX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX))}
|
|
RBX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBX))}
|
|
RCX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RCX))}
|
|
RDX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDX))}
|
|
RSI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSI))}
|
|
RDI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDI))}
|
|
RSP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSP))}
|
|
RBP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBP))}
|
|
RIP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RIP))}
|
|
R8: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R8))}
|
|
R9: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R9))}
|
|
""")
|
|
def wrap_hook(self, func: callable) -> callable:
|
|
# Get the number of arguments the function takes
|
|
arg_count = func.__code__.co_argcount
|
|
#print(f"Wrapping {arg_count} argument function {func.__name__}")
|
|
# Create a wrapper function that reads the arguments from registers and the stack
|
|
def wrapper(self: 'Jelly'):
|
|
args = []
|
|
for i in range(1, arg_count):
|
|
if i < 6:
|
|
args.append(self.uc.reg_read(ARG_REGISTERS[i-1]))
|
|
else:
|
|
args.append(self.instr.pop())
|
|
#print(ARG_REGISTERS[1])
|
|
#self.debug_registers()
|
|
logger.debug(f"calling {func.__name__}")
|
|
if args != []:
|
|
logger.debug(f" with args: {args}")
|
|
ret = func(self, *args)
|
|
if ret is not None:
|
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_RAX, ret)
|
|
return
|
|
return wrapper
|
|
|
|
|
|
def malloc(self, size: int) -> int:
|
|
# Very naive malloc implementation
|
|
addr = self.HEAP_BASE + self._heap_use
|
|
self._heap_use += size
|
|
return addr
|
|
|
|
def _setup_stop(self):
|
|
self.uc.mem_map(self.STOP_ADDRESS, 0x1000)
|
|
self.uc.mem_write(self.STOP_ADDRESS, b"\xc3" * 0x1000)
|
|
|
|
def _resolve_hook(uc: unicorn.Uc, address: int, size: int, self: 'Jelly'):
|
|
for name, addr in self._resolved_hooks.items():
|
|
if addr == address:
|
|
logger.debug(f"{name}: ")
|
|
self._hooks[name](self)
|
|
|
|
def _setup_hooks(self):
|
|
# Wrap all hooks
|
|
for name, func in self._hooks.items():
|
|
self._hooks[name] = self.wrap_hook(func)
|
|
|
|
self.uc.mem_map(self.HOOK_BASE, self.HOOK_SIZE)
|
|
# Write 'ret' instruction to all hook addresses
|
|
self.uc.mem_write(self.HOOK_BASE, b"\xc3" * self.HOOK_SIZE)
|
|
# Assign address in hook space to each hook
|
|
current_address = self.HOOK_BASE
|
|
self._resolved_hooks = {}
|
|
for hook in self._hooks:
|
|
self._resolved_hooks[hook] = current_address
|
|
current_address += 1
|
|
# Add unicorn instruction hook to entire hook space
|
|
self.uc.hook_add(unicorn.UC_HOOK_CODE, Jelly._resolve_hook, begin=self.HOOK_BASE, end=self.HOOK_BASE + self.HOOK_SIZE, user_data=self)
|
|
|
|
def _map_binary(self):
|
|
self.uc.mem_map(self.BINARY_BASE, round_to_page_size(len(self._binary), self.uc.ctl_get_page_size()))
|
|
self.uc.mem_write(self.BINARY_BASE, self._binary)
|
|
|
|
# Unmap the first page so we can catch NULL derefs
|
|
self.uc.mem_unmap(0x0, self.uc.ctl_get_page_size())
|
|
|
|
# Parse the binary so we can process binds
|
|
p = macholibre.Parser(self._binary)
|
|
p.parse()
|
|
|
|
for seg in p.segments:
|
|
for section in seg['sects']:
|
|
if section['type'] == 'LAZY_SYMBOL_POINTERS' or section['type'] == 'NON_LAZY_SYMBOL_POINTERS':
|
|
self._parse_lazy_binds(self.uc, section['r1'], section, self._binary[p.dysymtab['indirectsymoff']:], self._binary[p.symtab['stroff']:], self._binary[p.symtab['symoff']:])
|
|
|
|
self._parse_binds(self.uc, self._binary[p.dyld_info['bind_off']:p.dyld_info['bind_off']+p.dyld_info['bind_size']], p.segments)
|
|
|
|
def _do_bind(self, mu: unicorn.Uc, type, location, name):
|
|
if type == 1: # BIND_TYPE_POINTER
|
|
if name in self._hooks:
|
|
#print(f"Hooking {name} at {hex(location)}")
|
|
mu.mem_write(location, self._resolved_hooks[name].to_bytes(8, byteorder='little'))
|
|
else:
|
|
#print(f"Unknown symbol {name}")
|
|
pass
|
|
else:
|
|
raise NotImplementedError(f"Unknown bind type {type}")
|
|
|
|
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
|
|
logger.debug(f"Doing binds for {section['name']}")
|
|
for i in range(0, int(section['size']/8)):
|
|
# Parse into proper list?
|
|
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
|
|
dysym = int.from_bytes(dysym, 'little')
|
|
index = dysym & 0x3fffffff
|
|
|
|
# Proper list too?
|
|
symbol = symtab[index * 16:(index * 16) + 4]
|
|
strx = int.from_bytes(symbol, 'little')
|
|
|
|
name = c_string(strtab, strx) # Remove _ at beginning
|
|
#print(f"Lazy bind for {hex(section['offset'] + (i * 8))} : {name}")
|
|
self._do_bind(mu, 1, section['offset'] + (i * 8), name)
|
|
|
|
def _parse_binds(self, mu: unicorn.Uc, binds: bytes, segments):
|
|
blen = len(binds)
|
|
binds: BytesIO = BytesIO(binds)
|
|
|
|
ordinal = 0
|
|
symbolName = ''
|
|
type = BIND_TYPE_POINTER
|
|
addend = 0
|
|
segIndex = 0
|
|
segOffset = 0
|
|
|
|
while binds.tell() < blen:
|
|
current = binds.read(1)[0]
|
|
opcode = current & BIND_OPCODE_MASK
|
|
immediate = current & BIND_IMMEDIATE_MASK
|
|
|
|
#print(f"{hex(offset)}: {hex(opcode)} {hex(immediate)}")
|
|
|
|
if opcode == BIND_OPCODE_DONE:
|
|
logger.debug("BIND_OPCODE_DONE")
|
|
break
|
|
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
|
|
ordinal = immediate
|
|
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB:
|
|
#ordinal = uLEB128(&p);
|
|
ordinal = decodeULEB128(binds)
|
|
#raise NotImplementedError("BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB")
|
|
elif opcode == BIND_OPCODE_SET_DYLIB_SPECIAL_IMM:
|
|
if (immediate == 0):
|
|
ordinal = 0
|
|
else:
|
|
ordinal = BIND_OPCODE_MASK | immediate
|
|
elif opcode == BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM:
|
|
# Parse string until null terminator
|
|
symbolName = ''
|
|
while True:
|
|
b = binds.read(1)[0]
|
|
if b == 0:
|
|
break
|
|
symbolName += chr(b)
|
|
#while binds[offset] != 0:
|
|
# symbolName += chr(binds[offset])
|
|
# offset += 1
|
|
#offset += 1
|
|
#print(f"Symbol name: {symbolName}")
|
|
elif opcode == BIND_OPCODE_SET_TYPE_IMM:
|
|
type = immediate
|
|
elif opcode == BIND_OPCODE_SET_ADDEND_SLEB:
|
|
#addend = sLEB128(&p);
|
|
raise NotImplementedError("BIND_OPCODE_SET_ADDEND_SLEB")
|
|
elif opcode == BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB:
|
|
segIndex = immediate
|
|
segOffset = decodeULEB128(binds)
|
|
#raise NotImplementedError("BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB")
|
|
elif opcode == BIND_OPCODE_ADD_ADDR_ULEB:
|
|
segOffset += decodeULEB128(binds)
|
|
#segOffset += uLEB128(&p);
|
|
#raise NotImplementedError("BIND_OPCODE_ADD_ADDR_ULEB")
|
|
elif opcode == BIND_OPCODE_DO_BIND:
|
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
|
segOffset += 8
|
|
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB:
|
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
|
segOffset += decodeULEB128(binds) + 8
|
|
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
|
#segOffset += uLEB128(&p) + size_t.sizeof;
|
|
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB")
|
|
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED:
|
|
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
|
segOffset += immediate * 8 + 8
|
|
elif opcode == BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB:
|
|
count = decodeULEB128(binds)
|
|
skip = decodeULEB128(binds)
|
|
for i in range(count):
|
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
|
segOffset += skip + 8
|
|
# uint64_t count = uLEB128(&p);
|
|
# uint64_t skip = uLEB128(&p);
|
|
# for (uint64_t i = 0; i < count; i++) {
|
|
# bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
|
# segOffset += skip + size_t.sizeof;
|
|
# }
|
|
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB")
|
|
else:
|
|
logger.error(f"Unknown bind opcode {opcode}")
|
|
|
|
# Mach-O defines
|
|
BIND_OPCODE_DONE = 0x00
|
|
BIND_OPCODE_SET_DYLIB_ORDINAL_IMM = 0x10
|
|
BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB = 0x20
|
|
BIND_OPCODE_SET_DYLIB_SPECIAL_IMM = 0x30
|
|
BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM = 0x40
|
|
BIND_OPCODE_SET_TYPE_IMM = 0x50
|
|
BIND_OPCODE_SET_ADDEND_SLEB = 0x60
|
|
BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB = 0x70
|
|
BIND_OPCODE_ADD_ADDR_ULEB = 0x80
|
|
BIND_OPCODE_DO_BIND = 0x90
|
|
BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB = 0xA0
|
|
BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED = 0xB0
|
|
BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB = 0xC0
|
|
BIND_OPCODE_THREADED = 0xD0
|
|
|
|
BIND_TYPE_POINTER = 1
|
|
|
|
BIND_OPCODE_MASK = 0xF0
|
|
BIND_IMMEDIATE_MASK = 0x0F
|
|
|
|
# Helper functions
|
|
def round_to_page_size(size: int, page_size: int) -> int:
|
|
return (size + page_size - 1) & ~(page_size - 1)
|
|
|
|
def decodeULEB128(bytes: BytesIO) -> int:
|
|
result = 0
|
|
shift = 0
|
|
while True:
|
|
b = bytes.read(1)[0]
|
|
result |= (b & 0x7F) << shift
|
|
if (b & 0x80) == 0:
|
|
break
|
|
shift += 7
|
|
return result
|
|
|
|
def c_string(bytes, start: int = 0) -> str:
|
|
out = ''
|
|
i = start
|
|
|
|
while True:
|
|
if i > len(bytes) or bytes[i] == 0:
|
|
break
|
|
out += chr(bytes[i])
|
|
#print(start)
|
|
#print(chr(bytes[i]))
|
|
i += 1
|
|
return out
|