pypush-plus-plus/pypush/emulated/jelly.py

355 lines
14 KiB
Python
Raw Normal View History

2023-07-23 14:31:18 -05:00
from io import BytesIO
import unicorn
2023-07-23 14:37:43 -05:00
from . import mparser as macholibre
2023-07-23 17:55:13 -05:00
import logging
logger = logging.getLogger("jelly")
2023-07-23 14:47:35 -05:00
2023-07-23 14:31:18 -05:00
STOP_ADDRESS = 0x00900000 # Used as a return address when calling functions
ARG_REGISTERS = [
unicorn.x86_const.UC_X86_REG_RDI,
unicorn.x86_const.UC_X86_REG_RSI,
unicorn.x86_const.UC_X86_REG_RDX,
unicorn.x86_const.UC_X86_REG_RCX,
unicorn.x86_const.UC_X86_REG_R8,
unicorn.x86_const.UC_X86_REG_R9
]
class VirtualInstructions:
def __init__(self, uc: unicorn.Uc):
self.uc = uc
def push(self, value: int):
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) - 8)
self.uc.mem_write(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), value.to_bytes(8, byteorder='little'))
def pop(self) -> int:
value = int.from_bytes(self.uc.mem_read(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), 8), byteorder='little')
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) + 8)
return value
2023-07-23 14:31:18 -05:00
def _set_args(self, args: list[int]):
for i in range(len(args)):
if i < 6:
self.uc.reg_write(ARG_REGISTERS[i], args[i])
else:
self.push(args[i])
def call(self, address: int, args: list[int] = []):
2023-07-23 17:55:13 -05:00
logger.debug(f"Calling {hex(address)} with args {args}")
2023-07-23 14:31:18 -05:00
self.push(STOP_ADDRESS)
self._set_args(args)
self.uc.emu_start(address, STOP_ADDRESS)
return self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX)
class Jelly:
# Constants
UC_ARCH = unicorn.UC_ARCH_X86
UC_MODE = unicorn.UC_MODE_64
BINARY_BASE = 0x0
HOOK_BASE = 0xD00000
HOOK_SIZE = 0x1000
STACK_BASE = 0x00300000
STACK_SIZE = 0x00100000
HEAP_BASE = 0x00400000
HEAP_SIZE = 0x00100000
STOP_ADDRESS = 0x00900000
# Public variables
_hooks: dict[str, callable] = {}
"""Symbol name to hook function mapping"""
instr: VirtualInstructions = None
uc: unicorn.Uc = None
2023-07-23 14:31:18 -05:00
# Private variables
_binary: bytes = b""
_heap_use: int = 0
def __init__(self, binary: bytes):
self._binary = binary
def setup(self, hooks: dict[str, callable] = {}):
self._hooks = hooks
self._setup_unicorn()
self.instr = VirtualInstructions(self.uc)
self._setup_hooks()
self._map_binary()
self._setup_stack()
self._setup_heap()
self._setup_stop()
def _setup_unicorn(self):
self.uc = unicorn.Uc(self.UC_ARCH, self.UC_MODE)
def _setup_stack(self):
2023-07-23 14:31:18 -05:00
self.uc.mem_map(self.STACK_BASE, self.STACK_SIZE)
self.uc.mem_write(self.STACK_BASE, b"\x00" * self.STACK_SIZE)
2023-07-23 14:31:18 -05:00
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_BASE + self.STACK_SIZE)
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_BASE + self.STACK_SIZE)
def _setup_heap(self):
self.uc.mem_map(self.HEAP_BASE, self.HEAP_SIZE)
self.uc.mem_write(self.HEAP_BASE, b"\x00" * self.HEAP_SIZE)
def debug_registers(self):
2023-07-23 17:55:13 -05:00
logger.debug(f"""
2023-07-23 14:31:18 -05:00
RAX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX))}
RBX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBX))}
RCX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RCX))}
RDX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDX))}
RSI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSI))}
RDI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDI))}
RSP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSP))}
RBP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBP))}
RIP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RIP))}
R8: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R8))}
R9: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R9))}
""")
def wrap_hook(self, func: callable) -> callable:
# Get the number of arguments the function takes
arg_count = func.__code__.co_argcount
#print(f"Wrapping {arg_count} argument function {func.__name__}")
# Create a wrapper function that reads the arguments from registers and the stack
def wrapper(self: 'Jelly'):
args = []
for i in range(1, arg_count):
if i < 6:
args.append(self.uc.reg_read(ARG_REGISTERS[i-1]))
else:
args.append(self.instr.pop())
#print(ARG_REGISTERS[1])
#self.debug_registers()
2023-07-23 17:55:13 -05:00
logger.debug(f"calling {func.__name__}")
2023-07-23 14:31:18 -05:00
if args != []:
2023-07-23 17:55:13 -05:00
logger.debug(f" with args: {args}")
2023-07-23 14:31:18 -05:00
ret = func(self, *args)
if ret is not None:
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_RAX, ret)
return
return wrapper
def malloc(self, size: int) -> int:
# Very naive malloc implementation
addr = self.HEAP_BASE + self._heap_use
self._heap_use += size
return addr
def _setup_stop(self):
self.uc.mem_map(self.STOP_ADDRESS, 0x1000)
self.uc.mem_write(self.STOP_ADDRESS, b"\xc3" * 0x1000)
def _resolve_hook(uc: unicorn.Uc, address: int, size: int, self: 'Jelly'):
for name, addr in self._resolved_hooks.items():
if addr == address:
2023-07-23 17:55:13 -05:00
logger.debug(f"{name}: ")
2023-07-23 14:31:18 -05:00
self._hooks[name](self)
2023-07-23 14:31:18 -05:00
def _setup_hooks(self):
# Wrap all hooks
for name, func in self._hooks.items():
self._hooks[name] = self.wrap_hook(func)
2023-07-23 14:31:18 -05:00
self.uc.mem_map(self.HOOK_BASE, self.HOOK_SIZE)
# Write 'ret' instruction to all hook addresses
self.uc.mem_write(self.HOOK_BASE, b"\xc3" * self.HOOK_SIZE)
# Assign address in hook space to each hook
current_address = self.HOOK_BASE
self._resolved_hooks = {}
for hook in self._hooks:
self._resolved_hooks[hook] = current_address
current_address += 1
# Add unicorn instruction hook to entire hook space
self.uc.hook_add(unicorn.UC_HOOK_CODE, Jelly._resolve_hook, begin=self.HOOK_BASE, end=self.HOOK_BASE + self.HOOK_SIZE, user_data=self)
def _map_binary(self):
self.uc.mem_map(self.BINARY_BASE, round_to_page_size(len(self._binary), self.uc.ctl_get_page_size()))
self.uc.mem_write(self.BINARY_BASE, self._binary)
# Unmap the first page so we can catch NULL derefs
self.uc.mem_unmap(0x0, self.uc.ctl_get_page_size())
# Parse the binary so we can process binds
p = macholibre.Parser(self._binary)
p.parse()
2023-07-23 14:31:18 -05:00
for seg in p.segments:
for section in seg['sects']:
if section['type'] == 'LAZY_SYMBOL_POINTERS' or section['type'] == 'NON_LAZY_SYMBOL_POINTERS':
self._parse_lazy_binds(self.uc, section['r1'], section, self._binary[p.dysymtab['indirectsymoff']:], self._binary[p.symtab['stroff']:], self._binary[p.symtab['symoff']:])
self._parse_binds(self.uc, self._binary[p.dyld_info['bind_off']:p.dyld_info['bind_off']+p.dyld_info['bind_size']], p.segments)
def _do_bind(self, mu: unicorn.Uc, type, location, name):
if type == 1: # BIND_TYPE_POINTER
if name in self._hooks:
#print(f"Hooking {name} at {hex(location)}")
mu.mem_write(location, self._resolved_hooks[name].to_bytes(8, byteorder='little'))
else:
#print(f"Unknown symbol {name}")
pass
else:
raise NotImplementedError(f"Unknown bind type {type}")
2023-07-23 14:31:18 -05:00
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
2023-07-23 17:55:13 -05:00
logger.debug(f"Doing binds for {section['name']}")
for i in range(0, int(section['size']/8)):
# Parse into proper list?
2023-07-23 14:31:18 -05:00
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
dysym = int.from_bytes(dysym, 'little')
index = dysym & 0x3fffffff
# Proper list too?
symbol = symtab[index * 16:(index * 16) + 4]
strx = int.from_bytes(symbol, 'little')
name = c_string(strtab, strx) # Remove _ at beginning
#print(f"Lazy bind for {hex(section['offset'] + (i * 8))} : {name}")
self._do_bind(mu, 1, section['offset'] + (i * 8), name)
2023-07-23 14:31:18 -05:00
def _parse_binds(self, mu: unicorn.Uc, binds: bytes, segments):
blen = len(binds)
binds: BytesIO = BytesIO(binds)
ordinal = 0
symbolName = ''
type = BIND_TYPE_POINTER
addend = 0
segIndex = 0
segOffset = 0
while binds.tell() < blen:
current = binds.read(1)[0]
opcode = current & BIND_OPCODE_MASK
immediate = current & BIND_IMMEDIATE_MASK
#print(f"{hex(offset)}: {hex(opcode)} {hex(immediate)}")
if opcode == BIND_OPCODE_DONE:
2023-07-23 17:55:13 -05:00
logger.debug("BIND_OPCODE_DONE")
2023-07-23 14:31:18 -05:00
break
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
ordinal = immediate
2023-07-23 14:31:18 -05:00
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB:
#ordinal = uLEB128(&p);
ordinal = decodeULEB128(binds)
#raise NotImplementedError("BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB")
elif opcode == BIND_OPCODE_SET_DYLIB_SPECIAL_IMM:
if (immediate == 0):
ordinal = 0
else:
ordinal = BIND_OPCODE_MASK | immediate
elif opcode == BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM:
# Parse string until null terminator
symbolName = ''
while True:
b = binds.read(1)[0]
if b == 0:
break
symbolName += chr(b)
#while binds[offset] != 0:
# symbolName += chr(binds[offset])
# offset += 1
#offset += 1
#print(f"Symbol name: {symbolName}")
elif opcode == BIND_OPCODE_SET_TYPE_IMM:
type = immediate
elif opcode == BIND_OPCODE_SET_ADDEND_SLEB:
#addend = sLEB128(&p);
raise NotImplementedError("BIND_OPCODE_SET_ADDEND_SLEB")
elif opcode == BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB:
segIndex = immediate
segOffset = decodeULEB128(binds)
#raise NotImplementedError("BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB")
elif opcode == BIND_OPCODE_ADD_ADDR_ULEB:
segOffset += decodeULEB128(binds)
#segOffset += uLEB128(&p);
#raise NotImplementedError("BIND_OPCODE_ADD_ADDR_ULEB")
elif opcode == BIND_OPCODE_DO_BIND:
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += 8
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB:
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += decodeULEB128(binds) + 8
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
#segOffset += uLEB128(&p) + size_t.sizeof;
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB")
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED:
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += immediate * 8 + 8
elif opcode == BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB:
count = decodeULEB128(binds)
skip = decodeULEB128(binds)
for i in range(count):
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += skip + 8
# uint64_t count = uLEB128(&p);
# uint64_t skip = uLEB128(&p);
# for (uint64_t i = 0; i < count; i++) {
# bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
# segOffset += skip + size_t.sizeof;
# }
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB")
else:
2023-07-23 17:55:13 -05:00
logger.error(f"Unknown bind opcode {opcode}")
2023-07-23 14:31:18 -05:00
# Mach-O defines
BIND_OPCODE_DONE = 0x00
BIND_OPCODE_SET_DYLIB_ORDINAL_IMM = 0x10
BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB = 0x20
BIND_OPCODE_SET_DYLIB_SPECIAL_IMM = 0x30
BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM = 0x40
BIND_OPCODE_SET_TYPE_IMM = 0x50
BIND_OPCODE_SET_ADDEND_SLEB = 0x60
BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB = 0x70
BIND_OPCODE_ADD_ADDR_ULEB = 0x80
BIND_OPCODE_DO_BIND = 0x90
BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB = 0xA0
BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED = 0xB0
BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB = 0xC0
BIND_OPCODE_THREADED = 0xD0
BIND_TYPE_POINTER = 1
BIND_OPCODE_MASK = 0xF0
BIND_IMMEDIATE_MASK = 0x0F
# Helper functions
def round_to_page_size(size: int, page_size: int) -> int:
return (size + page_size - 1) & ~(page_size - 1)
def decodeULEB128(bytes: BytesIO) -> int:
result = 0
shift = 0
while True:
b = bytes.read(1)[0]
result |= (b & 0x7F) << shift
if (b & 0x80) == 0:
break
shift += 7
return result
def c_string(bytes, start: int = 0) -> str:
out = ''
i = start
2023-07-23 14:31:18 -05:00
while True:
if i > len(bytes) or bytes[i] == 0:
break
out += chr(bytes[i])
#print(start)
#print(chr(bytes[i]))
i += 1
return out