mirror of
https://github.com/Sneed-Group/pypush-plus-plus
synced 2024-12-23 11:22:42 -06:00
improve logging to make it less messy
This commit is contained in:
parent
053c776b38
commit
0f3b6de3ab
4 changed files with 51 additions and 32 deletions
13
demo.py
13
demo.py
|
@ -5,6 +5,18 @@ from base64 import b64decode
|
|||
import apns
|
||||
import ids
|
||||
|
||||
import logging
|
||||
from rich.logging import RichHandler
|
||||
|
||||
FORMAT = "%(message)s"
|
||||
logging.basicConfig(
|
||||
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
|
||||
)
|
||||
|
||||
# Set sane log levels
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("jelly").setLevel(logging.INFO)
|
||||
logging.getLogger("nac").setLevel(logging.INFO)
|
||||
|
||||
def input_multiline(prompt):
|
||||
print(prompt)
|
||||
|
@ -83,6 +95,7 @@ else:
|
|||
import emulated.nac
|
||||
vd = emulated.nac.generate_validation_data()
|
||||
vd = b64encode(vd).decode()
|
||||
raise Exception("No")
|
||||
user.register(vd)
|
||||
|
||||
print(user.lookup(["mailto:textgpt@icloud.com"]))
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
from io import BytesIO
|
||||
import unicorn
|
||||
from . import mparser as macholibre
|
||||
|
||||
print = lambda *args, **kwargs: None
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("jelly")
|
||||
|
||||
STOP_ADDRESS = 0x00900000 # Used as a return address when calling functions
|
||||
|
||||
|
@ -38,7 +37,7 @@ class VirtualInstructions:
|
|||
|
||||
|
||||
def call(self, address: int, args: list[int] = []):
|
||||
print(f"Calling {hex(address)} with args {args}")
|
||||
logger.debug(f"Calling {hex(address)} with args {args}")
|
||||
self.push(STOP_ADDRESS)
|
||||
self._set_args(args)
|
||||
self.uc.emu_start(address, STOP_ADDRESS)
|
||||
|
@ -105,7 +104,7 @@ class Jelly:
|
|||
self.uc.mem_write(self.HEAP_BASE, b"\x00" * self.HEAP_SIZE)
|
||||
|
||||
def debug_registers(self):
|
||||
print(f"""
|
||||
logger.debug(f"""
|
||||
RAX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RAX))}
|
||||
RBX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RBX))}
|
||||
RCX: {hex(self.uc.reg_read(unicorn.x86_const.UC_X86_REG_RCX))}
|
||||
|
@ -132,11 +131,9 @@ class Jelly:
|
|||
args.append(self.instr.pop())
|
||||
#print(ARG_REGISTERS[1])
|
||||
#self.debug_registers()
|
||||
print(f"calling {func.__name__}", end="")
|
||||
logger.debug(f"calling {func.__name__}")
|
||||
if args != []:
|
||||
print(f" with args: {args}")
|
||||
else:
|
||||
print()
|
||||
logger.debug(f" with args: {args}")
|
||||
ret = func(self, *args)
|
||||
if ret is not None:
|
||||
self.uc.reg_write(unicorn.x86_const.UC_X86_REG_RAX, ret)
|
||||
|
@ -157,7 +154,7 @@ class Jelly:
|
|||
def _resolve_hook(uc: unicorn.Uc, address: int, size: int, self: 'Jelly'):
|
||||
for name, addr in self._resolved_hooks.items():
|
||||
if addr == address:
|
||||
print(f"{name}: ", end="")
|
||||
logger.debug(f"{name}: ")
|
||||
self._hooks[name](self)
|
||||
|
||||
def _setup_hooks(self):
|
||||
|
@ -207,7 +204,7 @@ class Jelly:
|
|||
raise NotImplementedError(f"Unknown bind type {type}")
|
||||
|
||||
def _parse_lazy_binds(self, mu: unicorn.Uc, indirect_offset, section, dysimtab, strtab, symtab):
|
||||
print(f"Doing binds for {section['name']}")
|
||||
logger.debug(f"Doing binds for {section['name']}")
|
||||
for i in range(0, int(section['size']/8)):
|
||||
# Parse into proper list?
|
||||
dysym = dysimtab[(indirect_offset + i)*4:(indirect_offset + i)*4+4]
|
||||
|
@ -241,7 +238,7 @@ class Jelly:
|
|||
#print(f"{hex(offset)}: {hex(opcode)} {hex(immediate)}")
|
||||
|
||||
if opcode == BIND_OPCODE_DONE:
|
||||
print("BIND_OPCODE_DONE")
|
||||
logger.debug("BIND_OPCODE_DONE")
|
||||
break
|
||||
elif opcode == BIND_OPCODE_SET_DYLIB_ORDINAL_IMM:
|
||||
ordinal = immediate
|
||||
|
@ -307,7 +304,7 @@ class Jelly:
|
|||
# }
|
||||
#raise NotImplementedError("BIND_OPCODE_DO_BIND_ULEB_TIMES_SKIPPING_ULEB")
|
||||
else:
|
||||
print(f"Unknown bind opcode {opcode}")
|
||||
logger.error(f"Unknown bind opcode {opcode}")
|
||||
|
||||
# Mach-O defines
|
||||
BIND_OPCODE_DONE = 0x00
|
||||
|
|
|
@ -23,6 +23,9 @@ from plistlib import loads
|
|||
|
||||
from io import BytesIO
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("jelly")
|
||||
|
||||
|
||||
class Parser():
|
||||
"""Main object containing all the necessary functions to parse
|
||||
|
@ -1686,7 +1689,7 @@ class Parser():
|
|||
|
||||
if self.__file.read(4) != b'\xca\xfe\xba\xbe':
|
||||
# Throw a fit
|
||||
print("NOT A UNI MACHO???")
|
||||
logger.critical("Wrong magic for universal binary?")
|
||||
|
||||
n_machos = self.get_int(ignore_endian=True)
|
||||
|
||||
|
@ -1712,7 +1715,7 @@ class Parser():
|
|||
if subtype in mdictionary.cputypes[cputype]:
|
||||
subtype = mdictionary.cputypes[cputype][subtype]
|
||||
else:
|
||||
print("UNKNOWN CPU TYPE: " + str(cputype))
|
||||
logger.debug("UNKNOWN CPU TYPE: " + str(cputype))
|
||||
|
||||
cputype = mdictionary.cputypes[cputype][-2]
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ import hashlib
|
|||
from . import mparser as macholibre
|
||||
from .jelly import Jelly
|
||||
import plistlib
|
||||
import logging
|
||||
logger = logging.getLogger("nac")
|
||||
|
||||
BINARY_HASH = "e1181ccad82e6629d52c6a006645ad87ee59bd13"
|
||||
BINARY_PATH = "emulated/IMDAppleServices"
|
||||
|
@ -15,12 +17,13 @@ def load_binary() -> bytes:
|
|||
# Download the binary if it doesn't exist
|
||||
import os, requests
|
||||
if not os.path.exists(BINARY_PATH):
|
||||
print("Downloading binary...")
|
||||
logger.info("Downloading IMDAppleServices")
|
||||
resp = requests.get(BINARY_URL)
|
||||
b = resp.content
|
||||
# Save the binary
|
||||
open(BINARY_PATH, "wb").write(b)
|
||||
else:
|
||||
logger.debug("Using already downloaded IMDAppleServices")
|
||||
b = open(BINARY_PATH, "rb").read()
|
||||
if hashlib.sha1(b).hexdigest() != BINARY_HASH:
|
||||
raise Exception("Hashes don't match")
|
||||
|
@ -73,7 +76,7 @@ def nac_init(j: Jelly, cert: bytes):
|
|||
request_bytes_addr = int.from_bytes(request_bytes_addr, 'little')
|
||||
request_len = int.from_bytes(request_len, 'little')
|
||||
|
||||
print(f"Request @ {hex(request_bytes_addr)} : {hex(request_len)}")
|
||||
logger.debug(f"Request @ {hex(request_bytes_addr)} : {hex(request_len)}")
|
||||
|
||||
request = j.uc.mem_read(request_bytes_addr, request_len)
|
||||
|
||||
|
@ -133,7 +136,7 @@ def nac_generate(j: Jelly, validation_ctx: int):
|
|||
|
||||
|
||||
def hook_code(uc, address: int, size: int, user_data):
|
||||
print(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
|
||||
logger.debug(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
|
||||
|
||||
|
||||
def malloc(j: Jelly, len: int) -> int:
|
||||
|
@ -144,7 +147,7 @@ def malloc(j: Jelly, len: int) -> int:
|
|||
|
||||
|
||||
def memset_chk(j: Jelly, dest: int, c: int, len: int, destlen: int):
|
||||
print(
|
||||
logger.debug(
|
||||
"memset_chk called with dest = 0x%x, c = 0x%x, len = 0x%x, destlen = 0x%x"
|
||||
% (dest, c, len, destlen)
|
||||
)
|
||||
|
@ -157,7 +160,7 @@ def sysctlbyname(j: Jelly):
|
|||
|
||||
|
||||
def memcpy(j: Jelly, dest: int, src: int, len: int):
|
||||
print("memcpy called with dest = 0x%x, src = 0x%x, len = 0x%x" % (dest, src, len))
|
||||
logger.debug("memcpy called with dest = 0x%x, src = 0x%x, len = 0x%x" % (dest, src, len))
|
||||
orig = j.uc.mem_read(src, len)
|
||||
j.uc.mem_write(dest, bytes(orig))
|
||||
return 0
|
||||
|
@ -187,12 +190,12 @@ def IORegistryEntryCreateCFProperty(j: Jelly, entry: int, key: int, allocator: i
|
|||
key_str = _parse_cfstr_ptr(j, key)
|
||||
if key_str in FAKE_DATA["iokit"]:
|
||||
fake = FAKE_DATA["iokit"][key_str]
|
||||
print(f"IOKit Entry: {key_str} -> {fake}")
|
||||
logger.debug(f"IOKit Entry: {key_str} -> {fake}")
|
||||
# Return the index of the fake data in CF_OBJECTS
|
||||
CF_OBJECTS.append(fake)
|
||||
return len(CF_OBJECTS) # NOTE: We will have to subtract 1 from this later, can't return 0 here since that means NULL
|
||||
else:
|
||||
print(f"IOKit Entry: {key_str} -> None")
|
||||
logger.debug(f"IOKit Entry: {key_str} -> None")
|
||||
return 0
|
||||
|
||||
def CFGetTypeID(j: Jelly, obj: int):
|
||||
|
@ -216,7 +219,7 @@ def CFDataGetBytes(j: Jelly, obj: int, range_start: int, range_end: int, buf: in
|
|||
if isinstance(obj, bytes):
|
||||
data = obj[range_start:range_end]
|
||||
j.uc.mem_write(buf, data)
|
||||
print(f"CFDataGetBytes: {hex(range_start)}-{hex(range_end)} -> {hex(buf)}")
|
||||
logger.debug(f"CFDataGetBytes: {hex(range_start)}-{hex(range_end)} -> {hex(buf)}")
|
||||
return len(data)
|
||||
else:
|
||||
raise Exception("Unknown CF object type")
|
||||
|
@ -238,7 +241,7 @@ def maybe_object_maybe_string(j: Jelly, obj: int):
|
|||
return CF_OBJECTS[obj - 1]
|
||||
|
||||
def CFDictionaryGetValue(j: Jelly, d: int, key: int) -> int:
|
||||
print(f"CFDictionaryGetValue: {d} {hex(key)}")
|
||||
logger.debug(f"CFDictionaryGetValue: {d} {hex(key)}")
|
||||
d = CF_OBJECTS[d - 1]
|
||||
if key == 0xc3c3c3c3c3c3c3c3:
|
||||
key = "DADiskDescriptionVolumeUUIDKey" # Weirdness, this is a hack
|
||||
|
@ -246,7 +249,7 @@ def CFDictionaryGetValue(j: Jelly, d: int, key: int) -> int:
|
|||
if isinstance(d, dict):
|
||||
if key in d:
|
||||
val = d[key]
|
||||
print(f"CFDictionaryGetValue: {key} -> {val}")
|
||||
logger.debug(f"CFDictionaryGetValue: {key} -> {val}")
|
||||
CF_OBJECTS.append(val)
|
||||
return len(CF_OBJECTS)
|
||||
else:
|
||||
|
@ -285,7 +288,7 @@ def CFStringGetCString(j: Jelly, string: int, buf: int, buf_len: int, encoding:
|
|||
if isinstance(string, str):
|
||||
data = string.encode("utf-8")
|
||||
j.uc.mem_write(buf, data)
|
||||
print(f"CFStringGetCString: {string} -> {hex(buf)}")
|
||||
logger.debug(f"CFStringGetCString: {string} -> {hex(buf)}")
|
||||
return len(data)
|
||||
else:
|
||||
raise Exception("Unknown CF object type")
|
||||
|
@ -293,7 +296,7 @@ def CFStringGetCString(j: Jelly, string: int, buf: int, buf_len: int, encoding:
|
|||
def IOServiceMatching(j: Jelly, name: int) -> int:
|
||||
# Read the raw c string pointed to by name
|
||||
name = _parse_cstr_ptr(j, name)
|
||||
print(f"IOServiceMatching: {name}")
|
||||
logger.debug(f"IOServiceMatching: {name}")
|
||||
# Create a CFString from the name
|
||||
name = CFStringCreate(j, name)
|
||||
# Create a dictionary
|
||||
|
@ -400,18 +403,21 @@ def load_nac() -> Jelly:
|
|||
return j
|
||||
|
||||
def generate_validation_data() -> bytes:
|
||||
logger.info("Generating validation data")
|
||||
j = load_nac()
|
||||
logger.debug("Loaded NAC library")
|
||||
val_ctx, req = nac_init(j,get_cert())
|
||||
logger.debug("Initialized NAC")
|
||||
session_info = get_session_info(req)
|
||||
logger.debug("Got session info")
|
||||
nac_submit(j, val_ctx, session_info)
|
||||
logger.debug("Submitted session info")
|
||||
val_data = nac_generate(j, val_ctx)
|
||||
logger.info("Generated validation data")
|
||||
return bytes(val_data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
from base64 import b64encode
|
||||
val_data = generate_validation_data()
|
||||
print(f"Validation Data: {b64encode(val_data).decode()}")
|
||||
logger.info(f"Validation Data: {b64encode(val_data).decode()}")
|
||||
#main()
|
||||
else:
|
||||
# lazy hack: Disable print so that it's clean when not debugging
|
||||
print = lambda *args, **kwargs: None
|
Loading…
Reference in a new issue