add basic tui

This commit is contained in:
JJTech0130 2023-07-31 13:08:57 -04:00
parent 565dfb32ec
commit 9838294cf0
No known key found for this signature in database
GPG key ID: 23C92EBCCF8F93D6
2 changed files with 65 additions and 79 deletions

88
demo.py
View file

@ -19,10 +19,11 @@ logging.basicConfig(
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("jelly").setLevel(logging.INFO)
logging.getLogger("nac").setLevel(logging.INFO)
logging.getLogger("apns").setLevel(logging.DEBUG)
logging.getLogger("apns").setLevel(logging.INFO)
logging.getLogger("albert").setLevel(logging.INFO)
logging.getLogger("ids").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.DEBUG)
logging.getLogger("bags").setLevel(logging.INFO)
logging.getLogger("imessage").setLevel(logging.DEBUG)
# Try and load config.json
try:
@ -108,61 +109,42 @@ with open("config.json", "w") as f:
im = imessage.iMessageUser(conn, user)
# while True:
# i = im.receive()
# if i is not None:
# print(f"Got message {i}")
imsg = imessage.iMessage()
imsg.sender = user.handles[0]
imsg.participants = ["mailto:jjtech@jjtech.dev"]
imsg.text = "Hello world!"
INPUT_QUEUE = apns.IncomingQueue()
time.sleep(1)
im.send(imsg)
def input_thread():
while True:
from prompt_toolkit import prompt
for i in range(10):
z = im.receive()
if z is not None:
print(f"Got message {z}")
time.sleep(1)
# # Create a thread to take user input
# INPUT_QUEUE = apns.IncomingQueue()
try:
msg = prompt('>> ')
except:
msg = 'quit'
INPUT_QUEUE.append(msg)
# def input_thread():
# while True:
# from prompt_toolkit import prompt
# try:
# msg = prompt('>> ')
# except:
# msg = 'quit'
# INPUT_QUEUE.append(msg)
# threading.Thread(target=input_thread, daemon=True).start()
threading.Thread(target=input_thread, daemon=True).start()
# while True:
# msg = im.receive()
# if msg is not None:
# print(f"Got message {msg}")
while True:
msg = im.receive()
if msg is not None:
print(f"Got message {msg}")
# if len(INPUT_QUEUE) > 0:
# msg = INPUT_QUEUE.pop()
# if msg == 'help' or msg == 'h':
# print('help (h): show this message')
# print('quit (q): quit')
# print('send (s) [recipiant] [message]: send a message')
# elif msg == 'quit' or msg == 'q':
# break
# elif msg.startswith('send') or msg.startswith('s'):
# msg = msg.split(' ')
# if len(msg) < 3:
# print('send [recipiant] [message]')
# else:
# imsg = imessage.iMessage()
# imsg.text = ' '.join(msg[2:])
# imsg.participants = [msg[1], user.handles[0]]
# imsg.sender = user.handles[0]
# im.send(imsg)
if len(INPUT_QUEUE) > 0:
msg = INPUT_QUEUE.pop()
if msg == 'help' or msg == 'h':
print('help (h): show this message')
print('quit (q): quit')
print('send (s) [recipiant] [message]: send a message')
elif msg == 'quit' or msg == 'q':
break
elif msg.startswith('send') or msg.startswith('s'):
msg = msg.split(' ')
if len(msg) < 3:
print('send [recipiant] [message]')
else:
im.send(imessage.iMessage(
text=' '.join(msg[2:]),
participants=[msg[1], user.handles[0]],
#sender=user.handles[0]
))

View file

@ -179,7 +179,7 @@ class iMessageUser:
payload = BytesIO(payload)
tag = payload.read(1)
print("TAG", tag)
#print("TAG", tag)
body_length = int.from_bytes(payload.read(2), "big")
body = payload.read(body_length)
@ -325,7 +325,7 @@ class iMessageUser:
return None
body = apns._get_field(raw[1], 3)
body = plistlib.loads(body)
print(f"Got body message {body}")
#print(f"Got body message {body}")
payload = body["P"]
if not self._verify_payload(payload, body['sP'], body["t"]):
@ -419,28 +419,32 @@ class iMessageUser:
self.connection.send_message("com.apple.madrid", body, msg_id)
def check_response(x):
if x[0] != 0x0A:
return False
if apns._get_field(x[1], 2) != sha1("com.apple.madrid".encode()).digest():
return False
resp_body = apns._get_field(x[1], 3)
if resp_body is None:
return False
resp_body = plistlib.loads(resp_body)
if "c" not in resp_body or resp_body["c"] != 255:
return False
return True
# This code can check to make sure we got a success response, but waiting for the response is annoying,
# so for now we just YOLO it and assume it worked
num_recv = 0
while True:
if num_recv == len(bundled_payloads):
break
payload = self.connection.incoming_queue.wait_pop_find(check_response)
if payload is None:
continue
# def check_response(x):
# if x[0] != 0x0A:
# return False
# if apns._get_field(x[1], 2) != sha1("com.apple.madrid".encode()).digest():
# return False
# resp_body = apns._get_field(x[1], 3)
# if resp_body is None:
# return False
# resp_body = plistlib.loads(resp_body)
# if "c" not in resp_body or resp_body["c"] != 255:
# return False
# return True
resp_body = apns._get_field(payload[1], 3)
resp_body = plistlib.loads(resp_body)
logger.error(resp_body)
num_recv += 1
# num_recv = 0
# while True:
# if num_recv == len(bundled_payloads):
# break
# payload = self.connection.incoming_queue.wait_pop_find(check_response)
# if payload is None:
# continue
# resp_body = apns._get_field(payload[1], 3)
# resp_body = plistlib.loads(resp_body)
# logger.error(resp_body)
# num_recv += 1