Merge pull request #3 from JJTech0130/jelly-integration

This commit is contained in:
JJTech 2023-07-23 19:00:56 -04:00 committed by GitHub
commit 26f05c8713
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 3142 additions and 2 deletions

1
.gitignore vendored
View file

@ -1,4 +1,5 @@
config.json config.json
IMDAppleServices
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

17
demo.py
View file

@ -5,6 +5,18 @@ from base64 import b64decode
import apns import apns
import ids import ids
import logging
from rich.logging import RichHandler
FORMAT = "%(message)s"
logging.basicConfig(
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
)
# Set sane log levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
def input_multiline(prompt): def input_multiline(prompt):
print(prompt) print(prompt)
@ -79,7 +91,10 @@ if CONFIG.get("id", {}).get("cert") is not None:
id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"]) id_keypair = ids._helpers.KeyPair(CONFIG["id"]["key"], CONFIG["id"]["cert"])
user.restore_identity(id_keypair) user.restore_identity(id_keypair)
else: else:
vd = input_multiline("Enter validation data: ") #vd = input_multiline("Enter validation data: ")
import emulated.nac
vd = emulated.nac.generate_validation_data()
vd = b64encode(vd).decode()
user.register(vd) user.register(vd)
print(user.lookup(["mailto:textgpt@icloud.com"])) print(user.lookup(["mailto:textgpt@icloud.com"]))

55
emulated/data.plist Normal file
View file

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>iokit</key>
<dict>
<key>4D1EDE05-38C7-4A6A-9CC6-4BCCA8B38C14:MLB</key>
<data>
QzAyNzIzMjA3QTVIV1ZRQVU=
</data>
<key>4D1EDE05-38C7-4A6A-9CC6-4BCCA8B38C14:ROM</key>
<data>
+AN3alom
</data>
<key>Fyp98tpgj</key>
<data>
U0TE/KnCc/AGEUYBuDpD8TQ=
</data>
<key>Gq3489ugfi</key>
<data>
S7OU9/LJ3K6TUgIzaNHU5kI=
</data>
<key>IOMACAddress</key>
<data>
3KkEh8Ol
</data>
<key>IOPlatformSerialNumber</key>
<string>C02TV034HV29</string>
<key>IOPlatformUUID</key>
<string>0E7049C5-6CC5-57E4-9353-EF54C4332A99</string>
<key>abKPld1EcMni</key>
<data>
uqHToZ+DNmm75/jSPMzB1ZQ=
</data>
<key>board-id</key>
<data>
TWFjLUI0ODMxQ0VCRDUyQTBDNEMA
</data>
<key>kbjfrfpoJU</key>
<data>
WnM3jhjelH3+jt4jJ2OqfiQ=
</data>
<key>oycqAZloTNDm</key>
<data>
BNdC9rh4Wxif7S9NA8W1864=
</data>
<key>product-name</key>
<data>
TWFjQm9va1BybzE0LDEA
</data>
</dict>
<key>root_disk_uuid</key>
<string>3D6822B6-A26E-358C-BD6F-EFF407645F34</string>
</dict>
</plist>

356
emulated/jelly.py Normal file
View file

@ -0,0 +1,356 @@
from io import BytesIO
import unicorn
from . import mparser as macholibre
import logging
logger = logging.getLogger("jelly")
STOP_ADDRESS = 0x00900000 # Used as a return address when calling functions
ARG_REGISTERS = [
unicorn.x86_const.UC_X86_REG_RDI,
unicorn.x86_const.UC_X86_REG_RSI,
unicorn.x86_const.UC_X86_REG_RDX,
unicorn.x86_const.UC_X86_REG_RCX,
unicorn.x86_const.UC_X86_REG_R8,
unicorn.x86_const.UC_X86_REG_R9
]
class VirtualInstructions:
def __init__(self, uc: unicorn.Uc):
self.uc = uc
def push(self, value: int):
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) - 8)
self.uc.mem_write(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), value.to_bytes(8, byteorder='little'))
def pop(self) -> int:
value = int.from_bytes(self.uc.mem_read(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP), 8), byteorder='little')
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.uc.reg_read(unicorn.x86_const.UC_X86_REG_ESP) + 8)
return value
def _set_args(self, args: list[int]):
for i in range(len(args)):
if i < 6:
self.uc.reg_write(ARG_REGISTERS[i], args[i])
else:
self.push(args[i])
def call(self, address: int, args: list[int] = []):
logger.debug(f"Calling {hex(address)} with args {args}")
self.push(STOP_ADDRESS)
self._set_args(args)
self.uc.emu_start(address, STOP_ADDRESS)
return self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX)
class Jelly:
# Constants
UC_ARCH = unicorn.UC_ARCH_X86
UC_MODE = unicorn.UC_MODE_64
BINARY_BASE = 0x0
HOOK_BASE = 0xD00000
HOOK_SIZE = 0x1000
STACK_BASE = 0x00300000
STACK_SIZE = 0x00100000
HEAP_BASE = 0x00400000
HEAP_SIZE = 0x00100000
STOP_ADDRESS = 0x00900000
# Public variables
_hooks: dict[str, callable] = {}
"""Symbol name to hook function mapping"""
instr: VirtualInstructions = None
uc: unicorn.Uc = None
# Private variables
_binary: bytes = b""
_heap_use: int = 0
def __init__(self, binary: bytes):
self._binary = binary
def setup(self, hooks: dict[str, callable] = {}):
self._hooks = hooks
self._setup_unicorn()
self.instr = VirtualInstructions(self.uc)
self._setup_hooks()
self._map_binary()
self._setup_stack()
self._setup_heap()
self._setup_stop()
def _setup_unicorn(self):
self.uc = unicorn.Uc(self.UC_ARCH, self.UC_MODE)
def _setup_stack(self):
self.uc.mem_map(self.STACK_BASE, self.STACK_SIZE)
self.uc.mem_write(self.STACK_BASE, b"\x00" * self.STACK_SIZE)
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_ESP, self.STACK_BASE + self.STACK_SIZE)
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_EBP, self.STACK_BASE + self.STACK_SIZE)
def _setup_heap(self):
self.uc.mem_map(self.HEAP_BASE, self.HEAP_SIZE)
self.uc.mem_write(self.HEAP_BASE, b"\x00" * self.HEAP_SIZE)
def debug_registers(self):
logger.debug(f"""
RAX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX))}
RBX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBX))}
RCX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RCX))}
RDX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDX))}
RSI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSI))}
RDI: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RDI))}
RSP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RSP))}
RBP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBP))}
RIP: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RIP))}
R8: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R8))}
R9: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_R9))}
""")
def wrap_hook(self, func: callable) -> callable:
# Get the number of arguments the function takes
arg_count = func.__code__.co_argcount
#print(f"Wrapping {arg_count} argument function {func.__name__}")
# Create a wrapper function that reads the arguments from registers and the stack
def wrapper(self: 'Jelly'):
args = []
for i in range(1, arg_count):
if i < 6:
args.append(self.uc.reg_read(ARG_REGISTERS[i-1]))
else:
args.append(self.instr.pop())
#print(ARG_REGISTERS[1])
#self.debug_registers()
logger.debug(f"calling {func.__name__}")
if args != []:
logger.debug(f" with args: {args}")
ret = func(self, *args)
if ret is not None:
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_RAX, ret)
return
return wrapper
def malloc(self, size: int) -> int:
# Very naive malloc implementation
addr = self.HEAP_BASE + self._heap_use
self._heap_use += size
return addr
def _setup_stop(self):
self.uc.mem_map(self.STOP_ADDRESS, 0x1000)
self.uc.mem_write(self.STOP_ADDRESS, b"\xc3" * 0x1000)
def _resolve_hook(uc: unicorn.Uc, address: int, size: int, self: 'Jelly'):
for name, addr in self._resolved_hooks.items():
if addr == address:
logger.debug(f"{name}: ")
self._hooks[name](self)
def _setup_hooks(self):
# Wrap all hooks
for name, func in self._hooks.items():
self._hooks[name] = self.wrap_hook(func)
self.uc.mem_map(self.HOOK_BASE, self.HOOK_SIZE)
# Write 'ret' instruction to all hook addresses
self.uc.mem_write(self.HOOK_BASE, b"\xc3" * self.HOOK_SIZE)
# Assign address in hook space to each hook
current_address = self.HOOK_BASE
self._resolved_hooks = {}
for hook in self._hooks:
self._resolved_hooks[hook] = current_address
current_address += 1
# Add unicorn instruction hook to entire hook space
self.uc.hook_add(unicorn.UC_HOOK_CODE, Jelly._resolve_hook, begin=self.HOOK_BASE, end=self.HOOK_BASE + self.HOOK_SIZE, user_data=self)
def _map_binary(self):
self.uc.mem_map(self.BINARY_BASE, round_to_page_size(len(self._binary), self.uc.ctl_get_page_size()))
self.uc.mem_write(self.BINARY_BASE, self._binary)
# Unmap the first page so we can catch NULL derefs
self.uc.mem_unmap(0x0, self.uc.ctl_get_page_size())
# Parse the binary so we can process binds
p = macholibre.Parser(self._binary)
p.parse()
for seg in p.segments:
for section in seg['sects']:
if section['type'] == 'LAZY_SYMBOL_POINTERS' or section['type'] == 'NON_LAZY_SYMBOL_POINTERS':
self._parse_lazy_binds(self.uc, section['r1'], section, self._binary[p.dysymtab['indirectsymoff']:], self._binary[p.symtab['stroff']:], self._binary[p.symtab['symoff']:])
self._parse_binds(self.uc, self._binary[p.dyld_info['bind_off']:p.dyld_info['bind_off']+p.dyld_info['bind_size']], p.segments)
def _do_bind(self, mu: unicorn.Uc, type, location, name):
if type == 1: # BIND_TYPE_POINTER
if name in self._hooks:
#print(f"Hooking {name} at {hex(location)}")
mu.mem_write(location, self._resolved_hooks[name].to_bytes(8, byteorder='little'))
else:
#print(f"Unknown symbol {name}")
pass
else:
raise NotImplementedError(f"Unknown bind type {type}")
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
logger.debug(f"Doing binds for {section['name']}")
for i in range(0, int(section['size']/8)):
# Parse into proper list?
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
dysym = int.from_bytes(dysym, 'little')
index = dysym & 0x3fffffff
# Proper list too?
symbol = symtab[index * 16:(index * 16) + 4]
strx = int.from_bytes(symbol, 'little')
name = c_string(strtab, strx) # Remove _ at beginning
#print(f"Lazy bind for {hex(section['offset'] + (i * 8))} : {name}")
self._do_bind(mu, 1, section['offset'] + (i * 8), name)
def _parse_binds(self, mu: unicorn.Uc, binds: bytes, segments):
blen = len(binds)
binds: BytesIO = BytesIO(binds)
ordinal = 0
symbolName = ''
type = BIND_TYPE_POINTER
addend = 0
segIndex = 0
segOffset = 0
while binds.tell() < blen:
current = binds.read(1)[0]
opcode = current & BIND_OPCODE_MASK
immediate = current & BIND_IMMEDIATE_MASK
#print(f"{hex(offset)}: {hex(opcode)} {hex(immediate)}")
if opcode == BIND_OPCODE_DONE:
logger.debug("BIND_OPCODE_DONE")
break
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
ordinal = immediate
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB:
#ordinal = uLEB128(&p);
ordinal = decodeULEB128(binds)
#raise NotImplementedError("BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB")
elif opcode == BIND_OPCODE_SET_DYLIB_SPECIAL_IMM:
if (immediate == 0):
ordinal = 0
else:
ordinal = BIND_OPCODE_MASK | immediate
elif opcode == BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM:
# Parse string until null terminator
symbolName = ''
while True:
b = binds.read(1)[0]
if b == 0:
break
symbolName += chr(b)
#while binds[offset] != 0:
# symbolName += chr(binds[offset])
# offset += 1
#offset += 1
#print(f"Symbol name: {symbolName}")
elif opcode == BIND_OPCODE_SET_TYPE_IMM:
type = immediate
elif opcode == BIND_OPCODE_SET_ADDEND_SLEB:
#addend = sLEB128(&p);
raise NotImplementedError("BIND_OPCODE_SET_ADDEND_SLEB")
elif opcode == BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB:
segIndex = immediate
segOffset = decodeULEB128(binds)
#raise NotImplementedError("BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB")
elif opcode == BIND_OPCODE_ADD_ADDR_ULEB:
segOffset += decodeULEB128(binds)
#segOffset += uLEB128(&p);
#raise NotImplementedError("BIND_OPCODE_ADD_ADDR_ULEB")
elif opcode == BIND_OPCODE_DO_BIND:
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += 8
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB:
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += decodeULEB128(binds) + 8
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
#segOffset += uLEB128(&p) + size_t.sizeof;
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB")
elif opcode == BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED:
#bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += immediate * 8 + 8
elif opcode == BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB:
count = decodeULEB128(binds)
skip = decodeULEB128(binds)
for i in range(count):
self._do_bind(mu, type, segments[segIndex]['offset'] + segOffset, symbolName)
segOffset += skip + 8
# uint64_t count = uLEB128(&p);
# uint64_t skip = uLEB128(&p);
# for (uint64_t i = 0; i < count; i++) {
# bind(type, (cast(void**) &segments[segIndex][segOffset]), symbolName, addend, generateFallback);
# segOffset += skip + size_t.sizeof;
# }
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB")
else:
logger.error(f"Unknown bind opcode {opcode}")
# Mach-O defines
BIND_OPCODE_DONE = 0x00
BIND_OPCODE_SET_DYLIB_ORDINAL_IMM = 0x10
BIND_OPCODE_SET_DYLIB_ORDINAL_ULEB = 0x20
BIND_OPCODE_SET_DYLIB_SPECIAL_IMM = 0x30
BIND_OPCODE_SET_SYMBOL_TRAILING_FLAGS_IMM = 0x40
BIND_OPCODE_SET_TYPE_IMM = 0x50
BIND_OPCODE_SET_ADDEND_SLEB = 0x60
BIND_OPCODE_SET_SEGMENT_AND_OFFSET_ULEB = 0x70
BIND_OPCODE_ADD_ADDR_ULEB = 0x80
BIND_OPCODE_DO_BIND = 0x90
BIND_OPCODE_DO_BIND_ADD_ADDR_ULEB = 0xA0
BIND_OPCODE_DO_BIND_ADD_ADDR_IMM_SCALED = 0xB0
BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB = 0xC0
BIND_OPCODE_THREADED = 0xD0
BIND_TYPE_POINTER = 1
BIND_OPCODE_MASK = 0xF0
BIND_IMMEDIATE_MASK = 0x0F
# Helper functions
def round_to_page_size(size: int, page_size: int) -> int:
return (size + page_size - 1) & ~(page_size - 1)
def decodeULEB128(bytes: BytesIO) -> int:
result = 0
shift = 0
while True:
b = bytes.read(1)[0]
result |= (b & 0x7F) << shift
if (b & 0x80) == 0:
break
shift += 7
return result
def c_string(bytes, start: int = 0) -> str:
out = ''
i = start
while True:
if i > len(bytes) or bytes[i] == 0:
break
out += chr(bytes[i])
#print(start)
#print(chr(bytes[i]))
i += 1
return out

2289
emulated/mparser.py Normal file

File diff suppressed because it is too large Load diff

423
emulated/nac.py Normal file
View file

@ -0,0 +1,423 @@
import hashlib
from . import mparser as macholibre
from .jelly import Jelly
import plistlib
import logging
logger = logging.getLogger("nac")
BINARY_HASH = "e1181ccad82e6629d52c6a006645ad87ee59bd13"
BINARY_PATH = "emulated/IMDAppleServices"
BINARY_URL = "https://github.com/JJTech0130/nacserver/raw/main/IMDAppleServices"
FAKE_DATA = plistlib.load(open("emulated/data.plist", "rb"))
def load_binary() -> bytes:
# Open the file at BINARY_PATH, check the hash, and return the binary
# If the hash doesn't match, raise an exception
# Download the binary if it doesn't exist
import os, requests
if not os.path.exists(BINARY_PATH):
logger.info("Downloading IMDAppleServices")
resp = requests.get(BINARY_URL)
b = resp.content
# Save the binary
open(BINARY_PATH, "wb").write(b)
else:
logger.debug("Using already downloaded IMDAppleServices")
b = open(BINARY_PATH, "rb").read()
if hashlib.sha1(b).hexdigest() != BINARY_HASH:
raise Exception("Hashes don't match")
return b
def get_x64_slice(binary: bytes) -> bytes:
# Get the x64 slice of the binary
# If there is no x64 slice, raise an exception
p = macholibre.Parser(binary)
# Parse the binary to find the x64 slice
off, size = p.u_get_offset(cpu_type="X86_64")
return binary[off : off + size]
def nac_init(j: Jelly, cert: bytes):
# Allocate memory for the cert
cert_addr = j.malloc(len(cert))
j.uc.mem_write(cert_addr, cert)
# Allocate memory for the outputs
out_validation_ctx_addr = j.malloc(8)
out_request_bytes_addr = j.malloc(8)
out_request_len_addr = j.malloc(8)
# Call the function
ret = j.instr.call(
0xB1DB0,
[
cert_addr,
len(cert),
out_validation_ctx_addr,
out_request_bytes_addr,
out_request_len_addr,
],
)
#print(hex(ret))
if ret != 0:
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_init: {n}")
# Get the outputs
validation_ctx_addr = j.uc.mem_read(out_validation_ctx_addr, 8)
request_bytes_addr = j.uc.mem_read(out_request_bytes_addr, 8)
request_len = j.uc.mem_read(out_request_len_addr, 8)
request_bytes_addr = int.from_bytes(request_bytes_addr, 'little')
request_len = int.from_bytes(request_len, 'little')
logger.debug(f"Request @ {hex(request_bytes_addr)} : {hex(request_len)}")
request = j.uc.mem_read(request_bytes_addr, request_len)
validation_ctx_addr = int.from_bytes(validation_ctx_addr, 'little')
return validation_ctx_addr, request
def nac_key_establishment(j: Jelly, validation_ctx: int, response: bytes):
response_addr = j.malloc(len(response))
j.uc.mem_write(response_addr, response)
ret = j.instr.call(
0xB1DD0,
[
validation_ctx,
response_addr,
len(response),
],
)
if ret != 0:
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_submit: {n}")
def nac_sign(j: Jelly, validation_ctx: int):
#void *validation_ctx, void *unk_bytes, int unk_len,
# void **validation_data, int *validation_data_len
out_validation_data_addr = j.malloc(8)
out_validation_data_len_addr = j.malloc(8)
ret = j.instr.call(
0xB1DF0,
[
validation_ctx,
0,
0,
out_validation_data_addr,
out_validation_data_len_addr,
],
)
if ret != 0:
n = ret & 0xffffffff
n = (n ^ 0x80000000) - 0x80000000
raise Exception(f"Error calling nac_generate: {n}")
validation_data_addr = j.uc.mem_read(out_validation_data_addr, 8)
validation_data_len = j.uc.mem_read(out_validation_data_len_addr, 8)
validation_data_addr = int.from_bytes(validation_data_addr, 'little')
validation_data_len = int.from_bytes(validation_data_len, 'little')
validation_data = j.uc.mem_read(validation_data_addr, validation_data_len)
return validation_data
def hook_code(uc, address: int, size: int, user_data):
logger.debug(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
def malloc(j: Jelly, len: int) -> int:
# Hook malloc
# Return the address of the allocated memory
#print("malloc hook called with len = %d" % len)
return j.malloc(len)
def memset_chk(j: Jelly, dest: int, c: int, len: int, destlen: int):
logger.debug(
"memset_chk called with dest = 0x%x, c = 0x%x, len = 0x%x, destlen = 0x%x"
% (dest, c, len, destlen)
)
j.uc.mem_write(dest, bytes([c]) * len)
return 0
def sysctlbyname(j: Jelly):
return 0 # The output is not checked
def memcpy(j: Jelly, dest: int, src: int, len: int):
logger.debug("memcpy called with dest = 0x%x, src = 0x%x, len = 0x%x" % (dest, src, len))
orig = j.uc.mem_read(src, len)
j.uc.mem_write(dest, bytes(orig))
return 0
CF_OBJECTS = []
# struct __builtin_CFString {
# int *isa; // point to __CFConstantStringClassReference
# int flags;
# const char *str;
# long length;
# }
import struct
def _parse_cfstr_ptr(j: Jelly, ptr: int) -> str:
size = struct.calcsize("<QQQQ")
data = j.uc.mem_read(ptr, size)
isa, flags, str_ptr, length = struct.unpack("<QQQQ", data)
str_data = j.uc.mem_read(str_ptr, length)
return str_data.decode("utf-8")
def _parse_cstr_ptr(j: Jelly, ptr: int) -> str:
data = j.uc.mem_read(ptr, 256) # Lazy way to do it
return data.split(b"\x00")[0].decode("utf-8")
def IORegistryEntryCreateCFProperty(j: Jelly, entry: int, key: int, allocator: int, options: int):
key_str = _parse_cfstr_ptr(j, key)
if key_str in FAKE_DATA["iokit"]:
fake = FAKE_DATA["iokit"][key_str]
logger.debug(f"IOKit Entry: {key_str} -> {fake}")
# Return the index of the fake data in CF_OBJECTS
CF_OBJECTS.append(fake)
return len(CF_OBJECTS) # NOTE: We will have to subtract 1 from this later, can't return 0 here since that means NULL
else:
logger.debug(f"IOKit Entry: {key_str} -> None")
return 0
def CFGetTypeID(j: Jelly, obj: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
return 1
elif isinstance(obj, str):
return 2
else:
raise Exception("Unknown CF object type")
def CFDataGetLength(j: Jelly, obj: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
return len(obj)
else:
raise Exception("Unknown CF object type")
def CFDataGetBytes(j: Jelly, obj: int, range_start: int, range_end: int, buf: int):
obj = CF_OBJECTS[obj - 1]
if isinstance(obj, bytes):
data = obj[range_start:range_end]
j.uc.mem_write(buf, data)
logger.debug(f"CFDataGetBytes: {hex(range_start)}-{hex(range_end)} -> {hex(buf)}")
return len(data)
else:
raise Exception("Unknown CF object type")
def CFDictionaryCreateMutable(j: Jelly) -> int:
CF_OBJECTS.append({})
return len(CF_OBJECTS)
def maybe_object_maybe_string(j: Jelly, obj: int):
# If it's already a str
if isinstance(obj, str):
return obj
elif obj > len(CF_OBJECTS):
return obj
#raise Exception(f"WTF: {hex(obj)}")
# This is probably a CFString
# return _parse_cfstr_ptr(j, obj)
else:
return CF_OBJECTS[obj - 1]
def CFDictionaryGetValue(j: Jelly, d: int, key: int) -> int:
logger.debug(f"CFDictionaryGetValue: {d} {hex(key)}")
d = CF_OBJECTS[d - 1]
if key == 0xc3c3c3c3c3c3c3c3:
key = "DADiskDescriptionVolumeUUIDKey" # Weirdness, this is a hack
key = maybe_object_maybe_string(j, key)
if isinstance(d, dict):
if key in d:
val = d[key]
logger.debug(f"CFDictionaryGetValue: {key} -> {val}")
CF_OBJECTS.append(val)
return len(CF_OBJECTS)
else:
raise Exception("Key not found")
return 0
else:
raise Exception("Unknown CF object type")
def CFDictionarySetValue(j: Jelly, d: int, key: int, val: int):
d = CF_OBJECTS[d - 1]
key = maybe_object_maybe_string(j, key)
val = maybe_object_maybe_string(j, val)
if isinstance(d, dict):
d[key] = val
else:
raise Exception("Unknown CF object type")
def DADiskCopyDescription(j: Jelly) -> int:
description = CFDictionaryCreateMutable(j)
CFDictionarySetValue(j, description, "DADiskDescriptionVolumeUUIDKey", FAKE_DATA["root_disk_uuid"])
return description
def CFStringCreate(j: Jelly, string: str) -> int:
CF_OBJECTS.append(string)
return len(CF_OBJECTS)
def CFStringGetLength(j: Jelly, string: int) -> int:
string = CF_OBJECTS[string - 1]
if isinstance(string, str):
return len(string)
else:
raise Exception("Unknown CF object type")
def CFStringGetCString(j: Jelly, string: int, buf: int, buf_len: int, encoding: int) -> int:
string = CF_OBJECTS[string - 1]
if isinstance(string, str):
data = string.encode("utf-8")
j.uc.mem_write(buf, data)
logger.debug(f"CFStringGetCString: {string} -> {hex(buf)}")
return len(data)
else:
raise Exception("Unknown CF object type")
def IOServiceMatching(j: Jelly, name: int) -> int:
# Read the raw c string pointed to by name
name = _parse_cstr_ptr(j, name)
logger.debug(f"IOServiceMatching: {name}")
# Create a CFString from the name
name = CFStringCreate(j, name)
# Create a dictionary
d = CFDictionaryCreateMutable(j)
# Set the key "IOProviderClass" to the name
CFDictionarySetValue(j, d, "IOProviderClass", name)
# Return the dictionary
return d
def IOServiceGetMatchingService(j: Jelly) -> int:
return 92
ETH_ITERATOR_HACK = False
def IOServiceGetMatchingServices(j: Jelly, port, match, existing) -> int:
global ETH_ITERATOR_HACK
ETH_ITERATOR_HACK = True
# Write 93 to existing
j.uc.mem_write(existing, bytes([93]))
return 0
def IOIteratorNext(j: Jelly, iterator: int) -> int:
global ETH_ITERATOR_HACK
if ETH_ITERATOR_HACK:
ETH_ITERATOR_HACK = False
return 94
else:
return 0
def bzero(j: Jelly, ptr: int, len: int):
j.uc.mem_write(ptr, bytes([0]) * len)
return 0
def IORegistryEntryGetParentEntry(j: Jelly, entry: int, _, parent: int) -> int:
j.uc.mem_write(parent, bytes([entry + 100]))
return 0
import requests, plistlib
def get_cert():
resp = requests.get("http://static.ess.apple.com/identity/validation/cert-1.0.plist")
resp = plistlib.loads(resp.content)
return resp["cert"]
def get_session_info(req: bytes) -> bytes:
body = {
'session-info-request': req,
}
body = plistlib.dumps(body)
resp = requests.post("https://identity.ess.apple.com/WebObjects/TDIdentityService.woa/wa/initializeValidation", data=body, verify=False)
resp = plistlib.loads(resp.content)
return resp["session-info"]
def arc4random(j: Jelly) -> int:
import random
return random.randint(0, 0xFFFFFFFF)
#return 0
def load_nac() -> Jelly:
binary = load_binary()
binary = get_x64_slice(binary)
# Create a Jelly object from the binary
j = Jelly(binary)
hooks = {
"_malloc": malloc,
"___stack_chk_guard": lambda: 0,
"___memset_chk": memset_chk,
"_sysctlbyname": lambda _: 0,
"_memcpy": memcpy,
"_kIOMasterPortDefault": lambda: 0,
"_IORegistryEntryFromPath": lambda _: 1,
"_kCFAllocatorDefault": lambda: 0,
"_IORegistryEntryCreateCFProperty": IORegistryEntryCreateCFProperty,
"_CFGetTypeID": CFGetTypeID,
"_CFStringGetTypeID": lambda _: 2,
"_CFDataGetTypeID": lambda _: 1,
"_CFDataGetLength": CFDataGetLength,
"_CFDataGetBytes": CFDataGetBytes,
"_CFRelease": lambda _: 0,
"_IOObjectRelease": lambda _: 0,
"_statfs$INODE64": lambda _: 0,
"_DASessionCreate": lambda _: 201,
"_DADiskCreateFromBSDName": lambda _: 202,
"_kDADiskDescriptionVolumeUUIDKey": lambda: 0,
"_DADiskCopyDescription": DADiskCopyDescription,
"_CFDictionaryGetValue": CFDictionaryGetValue,
"_CFUUIDCreateString": lambda _, __, uuid: uuid,
"_CFStringGetLength": CFStringGetLength,
"_CFStringGetMaximumSizeForEncoding": lambda _, length, __: length,
"_CFStringGetCString": CFStringGetCString,
"_free": lambda _: 0,
"_IOServiceMatching": IOServiceMatching,
"_IOServiceGetMatchingService": IOServiceGetMatchingService,
"_CFDictionaryCreateMutable": CFDictionaryCreateMutable,
"_kCFBooleanTrue": lambda: 0,
"_CFDictionarySetValue": CFDictionarySetValue,
"_IOServiceGetMatchingServices": IOServiceGetMatchingServices,
"_IOIteratorNext": IOIteratorNext,
"___bzero": bzero,
"_IORegistryEntryGetParentEntry": IORegistryEntryGetParentEntry,
"_arc4random": arc4random
}
j.setup(hooks)
return j
def generate_validation_data() -> bytes:
logger.info("Generating validation data")
j = load_nac()
logger.debug("Loaded NAC library")
val_ctx, req = nac_init(j,get_cert())
logger.debug("Initialized NAC")
session_info = get_session_info(req)
logger.debug("Got session info")
nac_key_establishment(j, val_ctx, session_info)
logger.debug("Submitted session info")
val_data = nac_sign(j, val_ctx)
logger.info("Generated validation data")
return bytes(val_data)
if __name__ == "__main__":
from base64 import b64encode
val_data = generate_validation_data()
logger.info(f"Validation Data: {b64encode(val_data).decode()}")
#main()

View file

@ -3,4 +3,5 @@ cryptography
wheel wheel
tlslite-ng==0.8.0a43 tlslite-ng==0.8.0a43
srp srp
pbkdf2 pbkdf2
unicorn