shadowbrokers-exploits/windows/Resources/Ops/PyScripts/scansweep/monitorengine/arpcache.py
2017-04-14 11:45:07 +02:00

123 lines
No EOL
3.9 KiB
Python

import ops
import os.path
import imp
import re
import glob
import os
import sys
from xml.etree.ElementTree import ElementTree
import dsz
import ops.cmd
from monitorengine import monitor
DSZ_NS = '{urn:mca:db00db84-8b5b-2141-a632b5980175d3c6}'
COMMANDDATA_TAG = ('%sCommandData' % DSZ_NS)
COMMAND_TAG = ('%sCommand' % DSZ_NS)
ARPCACHE_PATTERN = 'Data/*arp*xml'
ARPTASKING_PATTERN = 'Tasking/*arp*xml'
ARGUMENT_TAG = ('%sArgument' % DSZ_NS)
ARPENTRY_TAG = ('%sArpEntry' % DSZ_NS)
IP_TAG = ('%sIP' % DSZ_NS)
PHYSICAL_TAG = ('%sPhysical' % DSZ_NS)
MACADDRESS_TAG = ('%sMacAddress' % DSZ_NS)
IPV4_TAG = ('%sIPv4Address' % DSZ_NS)
IPV6_TAG = ('%sIPv6Address' % DSZ_NS)
def _whats_your_job():
return 'arpcache'
def _whats_your_name():
return 'arpcache'
class arpcacheObj(object, ):
def __init__(self):
self.adapter = ''
self.type = ''
self.state = ''
self.ip = ''
self.iptype = ''
self.macaddress = ''
class arpcache(monitor, ):
def __init__(self, job):
monitor.__init__(self, job, ARPCACHE_PATTERN)
def parse_arptasking(self, file_to_read):
if (not file_to_read):
return
f = open(file_to_read, 'r')
tasking = f.readlines()
f.close()
for line in tasking:
if (re.search('monitor', line) is not None):
return True
return False
def parse_data(self, file_to_read):
if (not file_to_read):
return
tasking_files = self.find_tasking(file_to_read)
for file in tasking_files:
if (self.parse_arptasking(file) is False):
return (True, [])
data_path = '/'.join([COMMANDDATA_TAG, '*', ARPENTRY_TAG])
tree = ElementTree()
try:
tree.parse(file_to_read)
except:
dsz.ui.Echo(("Couldn't parse XML file: %s" % file_to_read), dsz.WARNING)
return (False, None)
arpentry_list = tree.findall(data_path)
arp_list = []
for arpentry in arpentry_list:
this_arp = arpcacheObj()
mac_tag = arpentry.find('/'.join([PHYSICAL_TAG, MACADDRESS_TAG]))
if (mac_tag is None):
continue
this_arp.macaddress = mac_tag.text.strip()
this_arp.adapter = arpentry.get('adapter').strip()
if (arpentry.get('type') is not None):
this_arp.type = arpentry.get('type').strip()
else:
this_arp.type = ''
if (arpentry.get('state') is not None):
this_arp.state = arpentry.get('state').strip()
else:
this_arp.state = ''
ipv4_tag = arpentry.find('/'.join([IP_TAG, IPV4_TAG]))
ipv6_tag = arpentry.find('/'.join([IP_TAG, IPV6_TAG]))
if ((ipv4_tag is None) and (ipv6_tag is None)):
continue
if (ipv6_tag is not None):
this_arp.ip = ipv6_tag.text.strip()
arpcache.iptype = 'ipv6'
this_arp.target = this_arp.ip
if (ipv4_tag is not None):
this_arp.ip = ipv4_tag.text.strip()
arpcache.iptype = 'ipv4'
this_arp.target = this_arp.ip
arp_list.append(this_arp)
return (True, arp_list)
def check_escalation(self, escalation_rule, arpentry):
arpcache = arpentry
try:
if eval(escalation_rule):
return True
else:
return False
except:
return False
def verify_escalation(self, escalation_rule):
arpcache = arpcacheObj()
try:
eval_res = eval(escalation_rule)
if ((eval_res == True) or (eval_res == False)):
return True
else:
return False
except:
return False