mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2025-01-09 17:33:47 +00:00
commit jelly files as they are now
This commit is contained in:
parent
5d4f0bcf4a
commit
c4caa08bf7
3 changed files with 3056 additions and 0 deletions
356
emulated/jelly.py
Normal file
356
emulated/jelly.py
Normal file
|
@ -0,0 +1,356 @@
|
||||||
|
from io import BytesIO
|
||||||
|
import unicorn
|
||||||
|
import mparser as macholibre
|
||||||
|
|
||||||
|
STOP_ADDRESS = 0x00900000 # Used as a return address when calling functions
|
||||||
|
|
||||||
|
ARG_REGISTERS = [
|
||||||
|
unicorn.x86_const.UC_X86_REG_RDI,
|
||||||
|
unicorn.x86_const.UC_X86_REG_RSI,
|
||||||
|
unicorn.x86_const.UC_X86_REG_RDX,
|
||||||
|
unicorn.x86_const.UC_X86_REG_RCX,
|
||||||
|
unicorn.x86_const.UC_X86_REG_R8,
|
||||||
|
unicorn.x86_const.UC_X86_REG_R9
|
||||||
|
]
|
||||||
|
|
||||||
|
class VirtualInstructions:
|
||||||
|
def __init__(self, uc: unicorn.Uc):
|
||||||
|
self.uc = uc
|
||||||
|
|
||||||
|
def push(self, value: int):
|
||||||
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) - 8)
|
||||||
|
self.uc.mem_write(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), value.to_bytes(8, byteorder='little'))
|
||||||
|
|
||||||
|
def pop(self) -> int:
|
||||||
|
value = int.from_bytes(self.uc.mem_read(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), 8), byteorder='little')
|
||||||
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) + 8)
|
||||||
|
return value
|
||||||
|
|
||||||
|
def _set_args(self, args: list[int]):
|
||||||
|
for i in range(len(args)):
|
||||||
|
if i < 6:
|
||||||
|
self.uc.reg_write(ARG_REGISTERS[i], args[i])
|
||||||
|
else:
|
||||||
|
self.push(args[i])
|
||||||
|
|
||||||
|
|
||||||
|
def call(self, address: int, args: list[int] = []):
|
||||||
|
print(f"Calling {hex(address)} with args {args}")
|
||||||
|
self.push(STOP_ADDRESS)
|
||||||
|
self._set_args(args)
|
||||||
|
self.uc.emu_start(address, STOP_ADDRESS)
|
||||||
|
return self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX)
|
||||||
|
|
||||||
|
|
||||||
|
class Jelly:
|
||||||
|
# Constants
|
||||||
|
UC_ARCH = unicorn.UC_ARCH_X86
|
||||||
|
UC_MODE = unicorn.UC_MODE_64
|
||||||
|
|
||||||
|
BINARY_BASE = 0x0
|
||||||
|
|
||||||
|
HOOK_BASE = 0xD00000
|
||||||
|
HOOK_SIZE = 0x1000
|
||||||
|
|
||||||
|
STACK_BASE = 0x00300000
|
||||||
|
STACK_SIZE = 0x00100000
|
||||||
|
|
||||||
|
HEAP_BASE = 0x00400000
|
||||||
|
HEAP_SIZE = 0x00100000
|
||||||
|
|
||||||
|
STOP_ADDRESS = 0x00900000
|
||||||
|
|
||||||
|
# Public variables
|
||||||
|
_hooks: dict[str, callable] = {}
|
||||||
|
"""Symbol name to hook function mapping"""
|
||||||
|
|
||||||
|
instr: VirtualInstructions = None
|
||||||
|
|
||||||
|
uc: unicorn.Uc = None
|
||||||
|
|
||||||
|
# Private variables
|
||||||
|
_binary: bytes = b""
|
||||||
|
|
||||||
|
_heap_use: int = 0
|
||||||
|
|
||||||
|
def __init__(self, binary: bytes):
|
||||||
|
self._binary = binary
|
||||||
|
|
||||||
|
def setup(self, hooks: dict[str, callable] = {}):
|
||||||
|
self._hooks = hooks
|
||||||
|
self._setup_unicorn()
|
||||||
|
self.instr = VirtualInstructions(self.uc)
|
||||||
|
self._setup_hooks()
|
||||||
|
self._map_binary()
|
||||||
|
self._setup_stack()
|
||||||
|
self._setup_heap()
|
||||||
|
self._setup_stop()
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_unicorn(self):
|
||||||
|
self.uc = unicorn.Uc(self.UC_ARCH, self.UC_MODE)
|
||||||
|
|
||||||
|
def _setup_stack(self):
|
||||||
|
self.uc.mem_map(self.STACK_BASE, self.STACK_SIZE)
|
||||||
|
self.uc.mem_write(self.STACK_BASE, b"\x00" * self.STACK_SIZE)
|
||||||
|
|
||||||
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_BASE + self.STACK_SIZE)
|
||||||
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_BASE + self.STACK_SIZE)
|
||||||
|
|
||||||
|
def _setup_heap(self):
|
||||||
|
self.uc.mem_map(self.HEAP_BASE, self.HEAP_SIZE)
|
||||||
|
self.uc.mem_write(self.HEAP_BASE, b"\x00" * self.HEAP_SIZE)
|
||||||
|
|
||||||
|
def debug_registers(self):
|
||||||
|
print(f"""
|
||||||
|
RAX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX))}
|
||||||
|
RBX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBX))}
|
||||||
|
RCX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RCX))}
|
||||||
|
RDX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDX))}
|
||||||
|
RSI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSI))}
|
||||||
|
RDI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDI))}
|
||||||
|
RSP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSP))}
|
||||||
|
RBP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBP))}
|
||||||
|
RIP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RIP))}
|
||||||
|
R8: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R8))}
|
||||||
|
R9: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R9))}
|
||||||
|
""")
|
||||||
|
def wrap_hook(self, func: callable) -> callable:
|
||||||
|
# Get the number of arguments the function takes
|
||||||
|
arg_count = func.__code__.co_argcount
|
||||||
|
#print(f"Wrapping {arg_count} argument function {func.__name__}")
|
||||||
|
# Create a wrapper function that reads the arguments from registers and the stack
|
||||||
|
def wrapper(self: 'Jelly'):
|
||||||
|
args = []
|
||||||
|
for i in range(1, arg_count):
|
||||||
|
if i < 6:
|
||||||
|
args.append(self.uc.reg_read(ARG_REGISTERS[i-1]))
|
||||||
|
else:
|
||||||
|
args.append(self.instr.pop())
|
||||||
|
#print(ARG_REGISTERS[1])
|
||||||
|
#self.debug_registers()
|
||||||
|
print(f"calling {func.__name__}", end="")
|
||||||
|
if args != []:
|
||||||
|
print(f" with args: {args}")
|
||||||
|
else:
|
||||||
|
print()
|
||||||
|
ret = func(self, *args)
|
||||||
|
if ret is not None:
|
||||||
|
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_RAX, ret)
|
||||||
|
return
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def malloc(self, size: int) -> int:
|
||||||
|
# Very naive malloc implementation
|
||||||
|
addr = self.HEAP_BASE + self._heap_use
|
||||||
|
self._heap_use += size
|
||||||
|
return addr
|
||||||
|
|
||||||
|
def _setup_stop(self):
|
||||||
|
self.uc.mem_map(self.STOP_ADDRESS, 0x1000)
|
||||||
|
self.uc.mem_write(self.STOP_ADDRESS, b"\xc3" * 0x1000)
|
||||||
|
|
||||||
|
def _resolve_hook(uc: unicorn.Uc, address: int, size: int, self: 'Jelly'):
|
||||||
|
for name, addr in self._resolved_hooks.items():
|
||||||
|
if addr == address:
|
||||||
|
print(f"{name}: ", end="")
|
||||||
|
self._hooks[name](self)
|
||||||
|
|
||||||
|
def _setup_hooks(self):
|
||||||
|
# Wrap all hooks
|
||||||
|
for name, func in self._hooks.items():
|
||||||
|
self._hooks[name] = self.wrap_hook(func)
|
||||||
|
|
||||||
|
self.uc.mem_map(self.HOOK_BASE, self.HOOK_SIZE)
|
||||||
|
# Write 'ret' instruction to all hook addresses
|
||||||
|
self.uc.mem_write(self.HOOK_BASE, b"\xc3" * self.HOOK_SIZE)
|
||||||
|
# Assign address in hook space to each hook
|
||||||
|
current_address = self.HOOK_BASE
|
||||||
|
self._resolved_hooks = {}
|
||||||
|
for hook in self._hooks:
|
||||||
|
self._resolved_hooks[hook] = current_address
|
||||||
|
current_address += 1
|
||||||
|
# Add unicorn instruction hook to entire hook space
|
||||||
|
self.uc.hook_add(unicorn.UC_HOOK_CODE, Jelly._resolve_hook, begin=self.HOOK_BASE, end=self.HOOK_BASE + self.HOOK_SIZE, user_data=self)
|
||||||
|
|
||||||
|
def _map_binary(self):
|
||||||
|
self.uc.mem_map(self.BINARY_BASE, round_to_page_size(len(self._binary), self.uc.ctl_get_page_size()))
|
||||||
|
self.uc.mem_write(self.BINARY_BASE, self._binary)
|
||||||
|
|
||||||
|
# Unmap the first page so we can catch NULL derefs
|
||||||
|
self.uc.mem_unmap(0x0, self.uc.ctl_get_page_size())
|
||||||
|
|
||||||
|
# Parse the binary so we can process binds
|
||||||
|
p = macholibre.Parser(self._binary)
|
||||||
|
p.parse()
|
||||||
|
|
||||||
|
for seg in p.segments:
|
||||||
|
for section in seg['sects']:
|
||||||
|
if section['type'] == 'LAZY_SYMBOL_POINTERS' or section['type'] == 'NON_LAZY_SYMBOL_POINTERS':
|
||||||
|
self._parse_lazy_binds(self.uc, section['r1'], section, self._binary[p.dysymtab['indirectsymoff']:], self._binary[p.symtab['stroff']:], self._binary[p.symtab['symoff']:])
|
||||||
|
|
||||||
|
self._parse_binds(self.uc, self._binary[p.dyld_info['bind_off']:p.dyld_info['bind_off']+p.dyld_info['bind_size']], p.segments)
|
||||||
|
|
||||||
|
def _do_bind(self, mu: unicorn.Uc, type, location, name):
|
||||||
|
if type == 1: # BIND_TYPE_POINTER
|
||||||
|
if name in self._hooks:
|
||||||
|
#print(f"Hooking {name} at {hex(location)}")
|
||||||
|
mu.mem_write(location, self._resolved_hooks[name].to_bytes(8, byteorder='little'))
|
||||||
|
else:
|
||||||
|
#print(f"Unknown symbol {name}")
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(f"Unknown bind type {type}")
|
||||||
|
|
||||||
|
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
|
||||||
|
print(f"Doing binds for {section['name']}")
|
||||||
|
for i in range(0, int(section['size']/8)):
|
||||||
|
# Parse into proper list?
|
||||||
|
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
|
||||||
|
dysym = int.from_bytes(dysym, 'little')
|
||||||
|
index = dysym & 0x3fffffff
|
||||||
|
|
||||||
|
# Proper list too?
|
||||||
|
symbol = symtab[index * 16:(index * 16) + 4]
|
||||||
|
strx = int.from_bytes(symbol, 'little')
|
||||||
|
|
||||||
|
name = c_string(strtab, strx) # Remove _ at beginning
|
||||||
|
#print(f"Lazy bind for {hex(section['offset'] + (i * 8))} : {name}")
|
||||||
|
self._do_bind(mu, 1, section['offset'] + (i * 8), name)
|
||||||
|
|
||||||
|
def _parse_binds(self, mu: unicorn.Uc, binds: bytes, segments):
|
||||||
|
blen = len(binds)
|
||||||
|
binds: BytesIO = BytesIO(binds)
|
||||||
|
|
||||||
|
ordinal = 0
|
||||||
|
symbolName = ''
|
||||||
|
type = BIND_TYPE_POINTER
|
||||||
|
addend = 0
|
||||||
|
segIndex = 0
|
||||||
|
segOffset = 0
|
||||||
|
|
||||||
|
while binds.tell() < blen:
|
||||||
|
current = binds.read(1)[0]
|
||||||
|
opcode = current & BIND_OPCODE_MASK
|
||||||
|
immediate = current & BIND_IMMEDIATE_MASK
|
||||||
|
|
||||||
|
#print(f"{hex(offset)}: {hex(opcode)} {hex(immediate)}")
|
||||||
|
|
||||||
|
if opcode == BIND_OPCODE_DONE:
|
||||||
|
print("BIND_OPCODE_DONE")
|
||||||
|
break
|
||||||
|
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
|
||||||
|
ordinal = immediate
|
||||||
|
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB:
|
||||||
|
#ordinal = uLEB128(&p);
|
||||||
|
ordinal = decodeULEB128(binds)
|
||||||
|
#raise NotImplementedError("BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB")
|
||||||
|
elif opcode == BIND_OPCODE_SET_DYLIB_SPECIAL_IMM:
|
||||||
|
if (immediate == 0):
|
||||||
|
ordinal = 0
|
||||||
|
else:
|
||||||
|
ordinal = BIND_OPCODE_MASK | immediate
|
||||||
|
elif opcode == BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM:
|
||||||
|
# Parse string until null terminator
|
||||||
|
symbolName = ''
|
||||||
|
while True:
|
||||||
|
b = binds.read(1)[0]
|
||||||
|
if b == 0:
|
||||||
|
break
|
||||||
|
symbolName += chr(b)
|
||||||
|
#while binds[offset] != 0:
|
||||||
|
# symbolName += chr(binds[offset])
|
||||||
|
# offset += 1
|
||||||
|
#offset += 1
|
||||||
|
#print(f"Symbol name: {symbolName}")
|
||||||
|
elif opcode == BIND_OPCODE_SET_TYPE_IMM:
|
||||||
|
type = immediate
|
||||||
|
elif opcode == BIND_OPCODE_SET_ADDEND_SLEB:
|
||||||
|
#addend = sLEB128(&p);
|
||||||
|
raise NotImplementedError("BIND_OPCODE_SET_ADDEND_SLEB")
|
||||||
|
elif opcode == BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB:
|
||||||
|
segIndex = immediate
|
||||||
|
segOffset = decodeULEB128(binds)
|
||||||
|
#raise NotImplementedError("BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB")
|
||||||
|
elif opcode == BIND_OPCODE_ADD_ADDR_ULEB:
|
||||||
|
segOffset += decodeULEB128(binds)
|
||||||
|
#segOffset += uLEB128(&p);
|
||||||
|
#raise NotImplementedError("BIND_OPCODE_ADD_ADDR_ULEB")
|
||||||
|
elif opcode == BIND_OPCODE_DO_BIND:
|
||||||
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
||||||
|
segOffset += 8
|
||||||
|
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB:
|
||||||
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
||||||
|
segOffset += decodeULEB128(binds) + 8
|
||||||
|
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
||||||
|
#segOffset += uLEB128(&p) + size_t.sizeof;
|
||||||
|
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB")
|
||||||
|
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED:
|
||||||
|
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
||||||
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
||||||
|
segOffset += immediate * 8 + 8
|
||||||
|
elif opcode == BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB:
|
||||||
|
count = decodeULEB128(binds)
|
||||||
|
skip = decodeULEB128(binds)
|
||||||
|
for i in range(count):
|
||||||
|
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
|
||||||
|
segOffset += skip + 8
|
||||||
|
# uint64_t count = uLEB128(&p);
|
||||||
|
# uint64_t skip = uLEB128(&p);
|
||||||
|
# for (uint64_t i = 0; i < count; i++) {
|
||||||
|
# bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
|
||||||
|
# segOffset += skip + size_t.sizeof;
|
||||||
|
# }
|
||||||
|
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB")
|
||||||
|
else:
|
||||||
|
print(f"Unknown bind opcode {opcode}")
|
||||||
|
|
||||||
|
# Mach-O defines
|
||||||
|
BIND_OPCODE_DONE = 0x00
|
||||||
|
BIND_OPCODE_SET_DYLIB_ORDINAL_IMM = 0x10
|
||||||
|
BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB = 0x20
|
||||||
|
BIND_OPCODE_SET_DYLIB_SPECIAL_IMM = 0x30
|
||||||
|
BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM = 0x40
|
||||||
|
BIND_OPCODE_SET_TYPE_IMM = 0x50
|
||||||
|
BIND_OPCODE_SET_ADDEND_SLEB = 0x60
|
||||||
|
BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB = 0x70
|
||||||
|
BIND_OPCODE_ADD_ADDR_ULEB = 0x80
|
||||||
|
BIND_OPCODE_DO_BIND = 0x90
|
||||||
|
BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB = 0xA0
|
||||||
|
BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED = 0xB0
|
||||||
|
BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB = 0xC0
|
||||||
|
BIND_OPCODE_THREADED = 0xD0
|
||||||
|
|
||||||
|
BIND_TYPE_POINTER = 1
|
||||||
|
|
||||||
|
BIND_OPCODE_MASK = 0xF0
|
||||||
|
BIND_IMMEDIATE_MASK = 0x0F
|
||||||
|
|
||||||
|
# Helper functions
|
||||||
|
def round_to_page_size(size: int, page_size: int) -> int:
|
||||||
|
return (size + page_size - 1) & ~(page_size - 1)
|
||||||
|
|
||||||
|
def decodeULEB128(bytes: BytesIO) -> int:
|
||||||
|
result = 0
|
||||||
|
shift = 0
|
||||||
|
while True:
|
||||||
|
b = bytes.read(1)[0]
|
||||||
|
result |= (b & 0x7F) << shift
|
||||||
|
if (b & 0x80) == 0:
|
||||||
|
break
|
||||||
|
shift += 7
|
||||||
|
return result
|
||||||
|
|
||||||
|
def c_string(bytes, start: int = 0) -> str:
|
||||||
|
out = ''
|
||||||
|
i = start
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if i > len(bytes) or bytes[i] == 0:
|
||||||
|
break
|
||||||
|
out += chr(bytes[i])
|
||||||
|
#print(start)
|
||||||
|
#print(chr(bytes[i]))
|
||||||
|
i += 1
|
||||||
|
return out
|
2286
emulated/mparser.py
Normal file
2286
emulated/mparser.py
Normal file
File diff suppressed because it is too large
Load diff
414
emulated/test.py
Normal file
414
emulated/test.py
Normal file
|
@ -0,0 +1,414 @@
|
||||||
|
import hashlib
|
||||||
|
import mparser as macholibre
|
||||||
|
from jelly import Jelly
|
||||||
|
import plistlib
|
||||||
|
|
||||||
|
BINARY_HASH = "e1181ccad82e6629d52c6a006645ad87ee59bd13"
|
||||||
|
BINARY_PATH = "/Users/jjtech/Downloads/IMDAppleServices"
|
||||||
|
BINARY_URL = "https://github.com/JJTech0130/nacserver/raw/main/IMDAppleServices"
|
||||||
|
|
||||||
|
FAKE_DATA = plistlib.load(open("data.plist", "rb"))
|
||||||
|
|
||||||
|
def load_binary() -> bytes:
|
||||||
|
# Open the file at BINARY_PATH, check the hash, and return the binary
|
||||||
|
# If the hash doesn't match, raise an exception
|
||||||
|
# Download the binary if it doesn't exist
|
||||||
|
import os, requests
|
||||||
|
if not os.path.exists(BINARY_PATH):
|
||||||
|
print("Downloading binary...")
|
||||||
|
resp = requests.get(BINARY_URL)
|
||||||
|
b = resp.content
|
||||||
|
else:
|
||||||
|
b = open(BINARY_PATH, "rb").read()
|
||||||
|
if hashlib.sha1(b).hexdigest() != BINARY_HASH:
|
||||||
|
raise Exception("Hashes don't match")
|
||||||
|
return b
|
||||||
|
|
||||||
|
|
||||||
|
def get_x64_slice(binary: bytes) -> bytes:
|
||||||
|
# Get the x64 slice of the binary
|
||||||
|
# If there is no x64 slice, raise an exception
|
||||||
|
p = macholibre.Parser(binary)
|
||||||
|
# Parse the binary to find the x64 slice
|
||||||
|
off, size = p.u_get_offset(cpu_type="X86_64")
|
||||||
|
return binary[off : off + size]
|
||||||
|
|
||||||
|
|
||||||
|
def nac_init(j: Jelly, cert: bytes):
|
||||||
|
# Allocate memory for the cert
|
||||||
|
cert_addr = j.malloc(len(cert))
|
||||||
|
j.uc.mem_write(cert_addr, cert)
|
||||||
|
|
||||||
|
# Allocate memory for the outputs
|
||||||
|
out_validation_ctx_addr = j.malloc(8)
|
||||||
|
out_request_bytes_addr = j.malloc(8)
|
||||||
|
out_request_len_addr = j.malloc(8)
|
||||||
|
|
||||||
|
# Call the function
|
||||||
|
ret = j.instr.call(
|
||||||
|
0xB1DB0,
|
||||||
|
[
|
||||||
|
cert_addr,
|
||||||
|
len(cert),
|
||||||
|
out_validation_ctx_addr,
|
||||||
|
out_request_bytes_addr,
|
||||||
|
out_request_len_addr,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
#print(hex(ret))
|
||||||
|
|
||||||
|
if ret != 0:
|
||||||
|
n = ret & 0xffffffff
|
||||||
|
n = (n ^ 0x80000000) - 0x80000000
|
||||||
|
raise Exception(f"Error calling nac_init: {n}")
|
||||||
|
|
||||||
|
# Get the outputs
|
||||||
|
validation_ctx_addr = j.uc.mem_read(out_validation_ctx_addr, 8)
|
||||||
|
request_bytes_addr = j.uc.mem_read(out_request_bytes_addr, 8)
|
||||||
|
request_len = j.uc.mem_read(out_request_len_addr, 8)
|
||||||
|
|
||||||
|
request_bytes_addr = int.from_bytes(request_bytes_addr, 'little')
|
||||||
|
request_len = int.from_bytes(request_len, 'little')
|
||||||
|
|
||||||
|
print(f"Request @ {hex(request_bytes_addr)} : {hex(request_len)}")
|
||||||
|
|
||||||
|
request = j.uc.mem_read(request_bytes_addr, request_len)
|
||||||
|
|
||||||
|
validation_ctx_addr = int.from_bytes(validation_ctx_addr, 'little')
|
||||||
|
return validation_ctx_addr, request
|
||||||
|
|
||||||
|
def nac_submit(j: Jelly, validation_ctx: int, response: bytes):
|
||||||
|
response_addr = j.malloc(len(response))
|
||||||
|
j.uc.mem_write(response_addr, response)
|
||||||
|
|
||||||
|
ret = j.instr.call(
|
||||||
|
0xB1DD0,
|
||||||
|
[
|
||||||
|
validation_ctx,
|
||||||
|
response_addr,
|
||||||
|
len(response),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
if ret != 0:
|
||||||
|
n = ret & 0xffffffff
|
||||||
|
n = (n ^ 0x80000000) - 0x80000000
|
||||||
|
raise Exception(f"Error calling nac_submit: {n}")
|
||||||
|
|
||||||
|
def nac_generate(j: Jelly, validation_ctx: int):
|
||||||
|
#void *validation_ctx, void *unk_bytes, int unk_len,
|
||||||
|
# void **validation_data, int *validation_data_len
|
||||||
|
|
||||||
|
out_validation_data_addr = j.malloc(8)
|
||||||
|
out_validation_data_len_addr = j.malloc(8)
|
||||||
|
|
||||||
|
ret = j.instr.call(
|
||||||
|
0xB1DF0,
|
||||||
|
[
|
||||||
|
validation_ctx,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
out_validation_data_addr,
|
||||||
|
out_validation_data_len_addr,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
if ret != 0:
|
||||||
|
n = ret & 0xffffffff
|
||||||
|
n = (n ^ 0x80000000) - 0x80000000
|
||||||
|
raise Exception(f"Error calling nac_generate: {n}")
|
||||||
|
|
||||||
|
validation_data_addr = j.uc.mem_read(out_validation_data_addr, 8)
|
||||||
|
validation_data_len = j.uc.mem_read(out_validation_data_len_addr, 8)
|
||||||
|
|
||||||
|
validation_data_addr = int.from_bytes(validation_data_addr, 'little')
|
||||||
|
validation_data_len = int.from_bytes(validation_data_len, 'little')
|
||||||
|
|
||||||
|
validation_data = j.uc.mem_read(validation_data_addr, validation_data_len)
|
||||||
|
|
||||||
|
return validation_data
|
||||||
|
|
||||||
|
|
||||||
|
def hook_code(uc, address: int, size: int, user_data):
|
||||||
|
print(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
|
||||||
|
|
||||||
|
|
||||||
|
def malloc(j: Jelly, len: int) -> int:
|
||||||
|
# Hook malloc
|
||||||
|
# Return the address of the allocated memory
|
||||||
|
#print("malloc hook called with len = %d" % len)
|
||||||
|
return j.malloc(len)
|
||||||
|
|
||||||
|
|
||||||
|
def memset_chk(j: Jelly, dest: int, c: int, len: int, destlen: int):
|
||||||
|
print(
|
||||||
|
"memset_chk called with dest = 0x%x, c = 0x%x, len = 0x%x, destlen = 0x%x"
|
||||||
|
% (dest, c, len, destlen)
|
||||||
|
)
|
||||||
|
j.uc.mem_write(dest, bytes([c]) * len)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def sysctlbyname(j: Jelly):
|
||||||
|
return 0 # The output is not checked
|
||||||
|
|
||||||
|
|
||||||
|
def memcpy(j: Jelly, dest: int, src: int, len: int):
|
||||||
|
print("memcpy called with dest = 0x%x, src = 0x%x, len = 0x%x" % (dest, src, len))
|
||||||
|
orig = j.uc.mem_read(src, len)
|
||||||
|
j.uc.mem_write(dest, bytes(orig))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
CF_OBJECTS = []
|
||||||
|
|
||||||
|
# struct __builtin_CFString {
|
||||||
|
# int *isa; // point to __CFConstantStringClassReference
|
||||||
|
# int flags;
|
||||||
|
# const char *str;
|
||||||
|
# long length;
|
||||||
|
# }
|
||||||
|
import struct
|
||||||
|
|
||||||
|
def _parse_cfstr_ptr(j: Jelly, ptr: int) -> str:
|
||||||
|
size = struct.calcsize("<QQQQ")
|
||||||
|
data = j.uc.mem_read(ptr, size)
|
||||||
|
isa, flags, str_ptr, length = struct.unpack("<QQQQ", data)
|
||||||
|
str_data = j.uc.mem_read(str_ptr, length)
|
||||||
|
return str_data.decode("utf-8")
|
||||||
|
|
||||||
|
def _parse_cstr_ptr(j: Jelly, ptr: int) -> str:
|
||||||
|
data = j.uc.mem_read(ptr, 256) # Lazy way to do it
|
||||||
|
return data.split(b"\x00")[0].decode("utf-8")
|
||||||
|
|
||||||
|
def IORegistryEntryCreateCFProperty(j: Jelly, entry: int, key: int, allocator: int, options: int):
|
||||||
|
key_str = _parse_cfstr_ptr(j, key)
|
||||||
|
if key_str in FAKE_DATA["iokit"]:
|
||||||
|
fake = FAKE_DATA["iokit"][key_str]
|
||||||
|
print(f"IOKit Entry: {key_str} -> {fake}")
|
||||||
|
# Return the index of the fake data in CF_OBJECTS
|
||||||
|
CF_OBJECTS.append(fake)
|
||||||
|
return len(CF_OBJECTS) # NOTE: We will have to subtract 1 from this later, can't return 0 here since that means NULL
|
||||||
|
else:
|
||||||
|
print(f"IOKit Entry: {key_str} -> None")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def CFGetTypeID(j: Jelly, obj: int):
|
||||||
|
obj = CF_OBJECTS[obj - 1]
|
||||||
|
if isinstance(obj, bytes):
|
||||||
|
return 1
|
||||||
|
elif isinstance(obj, str):
|
||||||
|
return 2
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def CFDataGetLength(j: Jelly, obj: int):
|
||||||
|
obj = CF_OBJECTS[obj - 1]
|
||||||
|
if isinstance(obj, bytes):
|
||||||
|
return len(obj)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def CFDataGetBytes(j: Jelly, obj: int, range_start: int, range_end: int, buf: int):
|
||||||
|
obj = CF_OBJECTS[obj - 1]
|
||||||
|
if isinstance(obj, bytes):
|
||||||
|
data = obj[range_start:range_end]
|
||||||
|
j.uc.mem_write(buf, data)
|
||||||
|
print(f"CFDataGetBytes: {hex(range_start)}-{hex(range_end)} -> {hex(buf)}")
|
||||||
|
return len(data)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def CFDictionaryCreateMutable(j: Jelly) -> int:
|
||||||
|
CF_OBJECTS.append({})
|
||||||
|
return len(CF_OBJECTS)
|
||||||
|
|
||||||
|
def maybe_object_maybe_string(j: Jelly, obj: int):
|
||||||
|
# If it's already a str
|
||||||
|
if isinstance(obj, str):
|
||||||
|
return obj
|
||||||
|
elif obj > len(CF_OBJECTS):
|
||||||
|
return obj
|
||||||
|
#raise Exception(f"WTF: {hex(obj)}")
|
||||||
|
# This is probably a CFString
|
||||||
|
# return _parse_cfstr_ptr(j, obj)
|
||||||
|
else:
|
||||||
|
return CF_OBJECTS[obj - 1]
|
||||||
|
|
||||||
|
def CFDictionaryGetValue(j: Jelly, d: int, key: int) -> int:
|
||||||
|
print(f"CFDictionaryGetValue: {d} {hex(key)}")
|
||||||
|
d = CF_OBJECTS[d - 1]
|
||||||
|
if key == 0xc3c3c3c3c3c3c3c3:
|
||||||
|
key = "DADiskDescriptionVolumeUUIDKey" # Weirdness, this is a hack
|
||||||
|
key = maybe_object_maybe_string(j, key)
|
||||||
|
if isinstance(d, dict):
|
||||||
|
if key in d:
|
||||||
|
val = d[key]
|
||||||
|
print(f"CFDictionaryGetValue: {key} -> {val}")
|
||||||
|
CF_OBJECTS.append(val)
|
||||||
|
return len(CF_OBJECTS)
|
||||||
|
else:
|
||||||
|
raise Exception("Key not found")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def CFDictionarySetValue(j: Jelly, d: int, key: int, val: int):
|
||||||
|
d = CF_OBJECTS[d - 1]
|
||||||
|
key = maybe_object_maybe_string(j, key)
|
||||||
|
val = maybe_object_maybe_string(j, val)
|
||||||
|
if isinstance(d, dict):
|
||||||
|
d[key] = val
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def DADiskCopyDescription(j: Jelly) -> int:
|
||||||
|
description = CFDictionaryCreateMutable(j)
|
||||||
|
CFDictionarySetValue(j, description, "DADiskDescriptionVolumeUUIDKey", FAKE_DATA["root_disk_uuid"])
|
||||||
|
return description
|
||||||
|
|
||||||
|
def CFStringCreate(j: Jelly, string: str) -> int:
|
||||||
|
CF_OBJECTS.append(string)
|
||||||
|
return len(CF_OBJECTS)
|
||||||
|
|
||||||
|
def CFStringGetLength(j: Jelly, string: int) -> int:
|
||||||
|
string = CF_OBJECTS[string - 1]
|
||||||
|
if isinstance(string, str):
|
||||||
|
return len(string)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def CFStringGetCString(j: Jelly, string: int, buf: int, buf_len: int, encoding: int) -> int:
|
||||||
|
string = CF_OBJECTS[string - 1]
|
||||||
|
if isinstance(string, str):
|
||||||
|
data = string.encode("utf-8")
|
||||||
|
j.uc.mem_write(buf, data)
|
||||||
|
print(f"CFStringGetCString: {string} -> {hex(buf)}")
|
||||||
|
return len(data)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown CF object type")
|
||||||
|
|
||||||
|
def IOServiceMatching(j: Jelly, name: int) -> int:
|
||||||
|
# Read the raw c string pointed to by name
|
||||||
|
name = _parse_cstr_ptr(j, name)
|
||||||
|
print(f"IOServiceMatching: {name}")
|
||||||
|
# Create a CFString from the name
|
||||||
|
name = CFStringCreate(j, name)
|
||||||
|
# Create a dictionary
|
||||||
|
d = CFDictionaryCreateMutable(j)
|
||||||
|
# Set the key "IOProviderClass" to the name
|
||||||
|
CFDictionarySetValue(j, d, "IOProviderClass", name)
|
||||||
|
# Return the dictionary
|
||||||
|
return d
|
||||||
|
|
||||||
|
def IOServiceGetMatchingService(j: Jelly) -> int:
|
||||||
|
return 92
|
||||||
|
|
||||||
|
ETH_ITERATOR_HACK = False
|
||||||
|
def IOServiceGetMatchingServices(j: Jelly, port, match, existing) -> int:
|
||||||
|
global ETH_ITERATOR_HACK
|
||||||
|
ETH_ITERATOR_HACK = True
|
||||||
|
# Write 93 to existing
|
||||||
|
j.uc.mem_write(existing, bytes([93]))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def IOIteratorNext(j: Jelly, iterator: int) -> int:
|
||||||
|
global ETH_ITERATOR_HACK
|
||||||
|
if ETH_ITERATOR_HACK:
|
||||||
|
ETH_ITERATOR_HACK = False
|
||||||
|
return 94
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def bzero(j: Jelly, ptr: int, len: int):
|
||||||
|
j.uc.mem_write(ptr, bytes([0]) * len)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def IORegistryEntryGetParentEntry(j: Jelly, entry: int, _, parent: int) -> int:
|
||||||
|
j.uc.mem_write(parent, bytes([entry + 100]))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
import requests, plistlib
|
||||||
|
def get_cert():
|
||||||
|
resp = requests.get("http://static.ess.apple.com/identity/validation/cert-1.0.plist")
|
||||||
|
resp = plistlib.loads(resp.content)
|
||||||
|
return resp["cert"]
|
||||||
|
|
||||||
|
def get_session_info(req: bytes) -> bytes:
|
||||||
|
body = {
|
||||||
|
'session-info-request': req,
|
||||||
|
}
|
||||||
|
body = plistlib.dumps(body)
|
||||||
|
resp = requests.post("https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/initializeValidation", data=body, verify=False)
|
||||||
|
resp = plistlib.loads(resp.content)
|
||||||
|
return resp["session-info"]
|
||||||
|
|
||||||
|
def arc4random(j: Jelly) -> int:
|
||||||
|
import random
|
||||||
|
return random.randint(0, 0xFFFFFFFF)
|
||||||
|
#return 0
|
||||||
|
|
||||||
|
def main():
|
||||||
|
binary = load_binary()
|
||||||
|
binary = get_x64_slice(binary)
|
||||||
|
# Create a Jelly object from the binary
|
||||||
|
j = Jelly(binary)
|
||||||
|
hooks = {
|
||||||
|
"_malloc": malloc,
|
||||||
|
"___stack_chk_guard": lambda: 0,
|
||||||
|
"___memset_chk": memset_chk,
|
||||||
|
"_sysctlbyname": lambda _: 0,
|
||||||
|
"_memcpy": memcpy,
|
||||||
|
"_kIOMasterPortDefault": lambda: 0,
|
||||||
|
"_IORegistryEntryFromPath": lambda _: 1,
|
||||||
|
"_kCFAllocatorDefault": lambda: 0,
|
||||||
|
"_IORegistryEntryCreateCFProperty": IORegistryEntryCreateCFProperty,
|
||||||
|
"_CFGetTypeID": CFGetTypeID,
|
||||||
|
"_CFStringGetTypeID": lambda _: 2,
|
||||||
|
"_CFDataGetTypeID": lambda _: 1,
|
||||||
|
"_CFDataGetLength": CFDataGetLength,
|
||||||
|
"_CFDataGetBytes": CFDataGetBytes,
|
||||||
|
"_CFRelease": lambda _: 0,
|
||||||
|
"_IOObjectRelease": lambda _: 0,
|
||||||
|
"_statfs$INODE64": lambda _: 0,
|
||||||
|
"_DASessionCreate": lambda _: 201,
|
||||||
|
"_DADiskCreateFromBSDName": lambda _: 202,
|
||||||
|
"_kDADiskDescriptionVolumeUUIDKey": lambda: 0,
|
||||||
|
"_DADiskCopyDescription": DADiskCopyDescription,
|
||||||
|
"_CFDictionaryGetValue": CFDictionaryGetValue,
|
||||||
|
"_CFUUIDCreateString": lambda _, __, uuid: uuid,
|
||||||
|
"_CFStringGetLength": CFStringGetLength,
|
||||||
|
"_CFStringGetMaximumSizeForEncoding": lambda _, length, __: length,
|
||||||
|
"_CFStringGetCString": CFStringGetCString,
|
||||||
|
"_free": lambda _: 0,
|
||||||
|
"_IOServiceMatching": IOServiceMatching,
|
||||||
|
"_IOServiceGetMatchingService": IOServiceGetMatchingService,
|
||||||
|
"_CFDictionaryCreateMutable": CFDictionaryCreateMutable,
|
||||||
|
"_kCFBooleanTrue": lambda: 0,
|
||||||
|
"_CFDictionarySetValue": CFDictionarySetValue,
|
||||||
|
"_IOServiceGetMatchingServices": IOServiceGetMatchingServices,
|
||||||
|
"_IOIteratorNext": IOIteratorNext,
|
||||||
|
"___bzero": bzero,
|
||||||
|
"_IORegistryEntryGetParentEntry": IORegistryEntryGetParentEntry,
|
||||||
|
"_arc4random": arc4random
|
||||||
|
}
|
||||||
|
j.setup(hooks)
|
||||||
|
#j.uc.hook_add(unicorn.UC_HOOK_CODE, hook_code)
|
||||||
|
|
||||||
|
from base64 import b64encode
|
||||||
|
cert = get_cert()
|
||||||
|
val_ctx, req = nac_init(j,cert)
|
||||||
|
print(f"Validation Context: {hex(val_ctx)}")
|
||||||
|
print(f"Request: {b64encode(req).decode()}")
|
||||||
|
|
||||||
|
session_info = get_session_info(req)
|
||||||
|
print(f"Session Info: {b64encode(session_info).decode()}")
|
||||||
|
|
||||||
|
nac_submit(j, val_ctx, session_info)
|
||||||
|
|
||||||
|
val_data = nac_generate(j, val_ctx)
|
||||||
|
|
||||||
|
print(f"Validation Data: {b64encode(val_data).decode()}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Reference in a new issue