"""Tanium Hygiene Assessment Tool (THAT) Main workflow module."""
from __future__ import absolute_import, division, print_function, unicode_literals
import atexit
import datetime
import imp
import os
import platform
import signal
import sys
import time
from . import LH_CON, LOG
from . import constants, options, plugin, pptx_builder, version
try:
import requests # TOOL_PATH/libs_external/any/requests/
import pytan # TOOL_PATH/libs_tanium/pytan/
import taniumpy # TOOL_PATH/libs_tanium/taniumpy/
except:
raise
try:
import tanium_kit # TOOL_PATH/libs_tanium/tanium_kit/
except:
raise
tanium_kit.log_tools.shutup_requests(loud=constants.DEBUG)
[docs]class Main(object):
"""Main workflow class for THAT.
Must call :meth:`tanium_hat.main.Main.start` in order to start the workflow.
Examples
--------
>>> # initialize the Main class
>>> main = that.main.Main()
>>> # use the default configuration file
>>> main.start()
>>> # or, use a specific configuration file
>>> main.start(config_file="my_config.ini")
Attributes
----------
CONFIG_FILE : :obj:`str`
* Path to configuration file for this instance
CONFIG : dict
* Unparsed configuration file in dict form, set by :meth:`tanium_hat.main.Main.load_config_file`
PCONFIG : dict
* Parsed configuration file in dict form, set by :meth:`tanium_hat.main.Main.parse_config_contents`
LOG : :obj:`logging.Logger`
* Logger for this instance
LH_CON : :obj:`logging.StreamHandler`
* Console log handler for this instance
LH_FILE : :obj:`logging.handlers.RotatingFileHandler`
* File log handler for this instance, set by :meth:`tanium_hat.main.Main.add_file_logging`
ERRORS : dict
* Used to track errors and where they occurred, managed by :meth:`tanium_hat.main.Main.err_handler`
EXCEPTIONS : dict
* Used to track exceptions and where they occurred, managed by :meth:`tanium_hat.main.Main.err_handler`
WEQUESTS : :obj:`tanium_kit.wequests.Wequests`
* Wequests object (wrapper for requests)
"""
[docs] def __init__(self):
"""Constructor."""
self.LOG = LOG
self.LH_CON = LH_CON
self.LH_FILE = None
self.CONFIG_FILE = ""
self.PCONFIG = tanium_kit.store.Store()
self.CONFIG = tanium_kit.store.Store()
self.ERRORS = {}
self.EXCEPTIONS = {}
self.WEQUESTS = None
[docs] def start(self, **kwargs):
"""Main workflow entry point for THAT.
Arguments
---------
config_file : :obj:`str`, optional
* default: :data:`tanium_hat.constants.CONFIG_FILE_PATH`
* path to configuration file to use for this instance
"""
self.CONFIG_FILE = kwargs.get("config_file", constants.CONFIG_FILE_PATH)
self.pre_flight()
self.flight_check()
self.takeoff()
self.land()
[docs] def signal_handler(self, signal, frame):
"""Signal handler for trapping Control-C (SIGINT)."""
print("\n")
m = "Interrupt received, exiting!"
self.LOG.error(m)
self.jump_ship()
[docs] def pre_flight(self):
"""Read configuration file and start file logging.
* Updates :attr:`tanium_hat.main.Main.CONFIG` with the dict parsed from :attr:`tanium_hat.main.Main.CONFIG_FILE`
* Uses :meth:`tanium_hat.main.Main.add_file_logging` to setup file logging
* Uses :meth:`tanium_hat.main.Main.parse_config_contents` to parse the configuration
"""
signal.signal(signal.SIGINT, self.signal_handler)
self.CONFIG = self.load_config_file(path=self.CONFIG_FILE)
self.add_file_logging()
self._sep1()
m = "Tanium Hygiene Assessment Tool (v{}) -- started running"
m = m.format(version.__version__)
self.LOG.info(m)
self._sep1()
self.parse_config_contents()
[docs] def add_file_logging(self):
"""Add a file log handler to the root logger that gets DEBUG and above log messages.
* Updates :attr:`tanium_hat.main.Main.LH_FILE` with the file log handler
"""
default_path = os.path.join(constants.TOOL_PATH, "data")
path = self.CONFIG.get("tanium_hat", {}).get("data_path", default_path)
if not os.path.isabs(path):
path = os.path.join(constants.TOOL_PATH, path)
self.LH_FILE = tanium_kit.log_tools.make_handler_file(
logger=LOG,
log_file_format=constants.LOG_FILE_FORMAT,
log_file_level="DEBUG",
log_file_handler_name="that_file",
log_file_name=constants.LOG_FILE,
log_file_dir=path,
log_file_count=constants.LOG_FILE_COUNT,
log_file_mb=constants.LOG_FILE_MB,
)
m = "Now logging to file: {}"
m = m.format(self.LH_FILE.stream.name)
self.LOG.info(m)
[docs] def flight_check(self):
"""Verify that at least one mode and one plugin was selected."""
modes = ["get_tanium_data", "get_internet_data", "analyze_data"]
any_mode = any([v for k, v in self.PCONFIG.items() if k in modes])
if not any_mode:
m = "Get Tanium Data, Get Internet Data, and Analyze data are all disabled"
self.LOG.error(m)
self.jump_ship()
any_plugins = self.PCONFIG.plugins
if not any_plugins:
self.LOG.error("No plugins enabled")
self.jump_ship()
[docs] def land(self):
"""Print out any errors that occurred and exit."""
self._sep1()
m = "Tanium Hygiene Assessment Tool -- finished running"
self.LOG.info(m)
self._sep1()
if self.ERRORS:
m = "Errors that occurred throughout this run:"
self.LOG.error(m)
for k, v in self.ERRORS.items():
for e in v:
m = "Error source: {} -- Error: {}"
m = m.format(k, e)
self.LOG.error(m)
self.jump_ship()
[docs] def jump_ship(self, lvl=99):
"""Method called to exit due to error.
Parameters
----------
lvl : :obj:`int`, optional
* Default: 99
* Exit code to end process with
"""
m = "Errors occurred, exiting..."
self.LOG.error(m)
sys.exit(lvl)
[docs] def register_zipper(self):
"""Create a zip file of the data_path when python script exits.
* Uses :func:`tanium_kit.zipper.mkzip` to create a zip file of the data_path
* Uses :func:`atexit.register` to register the zipper function to run when this script reaches the end of execution (due to error or not)
"""
dest_path = datetime.datetime.utcnow().strftime(constants.ZIP_DEST)
zip_file = constants.ZIP_FILE
src = "'ZIP_FILE' attribute in '{}'".format(constants.__file__)
zip_file = self.config_tmpl(zip_file, src)
zip_file = tanium_kit.tools.get_valid_filename(zip_file)
zip_path = os.path.join(self.PCONFIG.data_path, zip_file)
zipper_args = {
"zip_path": zip_path,
"src_path": self.PCONFIG.data_path,
"dest_path": dest_path,
"skips": constants.ZIP_SKIPS,
"remove_old": constants.ZIP_REMOVE_OLD,
}
atexit.register(tanium_kit.zipper.mkzip, **zipper_args)
[docs] def takeoff(self):
"""Run the enabled modes against the enabled plugins."""
try:
self.register_zipper()
if self.PCONFIG.get_tanium_data:
self.mode_get_tanium_data()
if self.PCONFIG.get_internet_data:
if self.PCONFIG.has_internet:
self.mode_get_internet_data()
else:
m = "No internet connection available, not getting internet data!"
self.LOG.warning(m)
if self.PCONFIG.analyze_data:
self.mode_analyze_data()
except SystemExit:
m = "Script exit received!!"
self.LOG.warning(m)
raise
except Exception as e:
m = "Error occurred while running: {} !!".format(str(e))
self.err_handler(m)
self.LOG.exception(m)
self.jump_ship()
[docs] def mode_get_tanium_data(self):
"""Run get_tanium_data for plugins and write results.
* Uses :meth:`tanium_hat.main.Main.get_handler` to get a PyTan handler connection to Tanium.
* Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.get_tanium_data` on every plugin
* Uses :meth:`tanium_hat.main.Main.collect_tanium_results` to get the results of get_tanium_data for all plugins
* Uses :meth:`tanium_hat.main.Main.write_csv` to write results to that_stats.csv in data_path
* Uses :meth:`tanium_hat.main.Main.write_json_files` to write json pages from Tanium.
"""
self.PCONFIG.handler = self.get_handler()
self.run_plugins("get_tanium_data")
results = self.collect_tanium_results()
filename = "that_stats.csv"
self.write_csv(results=results, path=filename)
self.write_json_files()
[docs] def mode_get_internet_data(self):
"""Run get_internet_data for plugins and write results.
* Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.get_internet_data` on every plugin
* Uses :meth:`tanium_hat.main.Main.collect_internet_results` to get the results of get_internet_data for all plugins
* Uses :meth:`tanium_hat.main.Main.write_csv` to write results to :data:`tanium_hat.constants.INTERNET_FILE_NAME` in data_path
"""
self.run_plugins("get_internet_data")
results = self.collect_internet_results()
self.write_csv(results=[results], path=constants.INTERNET_FILE_NAME)
[docs] def mode_analyze_data(self):
"""Run analyze_data for plugins, collect results, create PPTX file.
* Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.analyze_data` on every plugin
* Uses :meth:`tanium_hat.main.Main.collect_analyze_results` to get the results of analyze_data for all plugins
* Uses :class:`tanium_hat.pptx_builder.PptxBuilder` to create the PPTX file.
"""
self.run_plugins("analyze_data")
results = self.collect_analyze_results()
builder = self.PCONFIG.pptx_builder = pptx_builder.PptxBuilder(**self.PCONFIG)
m = "Created {}"
m = m.format(builder)
self.LOG.info(m)
builder.save()
builder.log_slides(layouts=True, debug=constants.DEBUG_PPTX)
builder.add_slides(layouts=self.PCONFIG.layouts, results=results)
builder.log_slides(layouts=False, debug=constants.DEBUG_PPTX)
builder.save()
[docs] def collect_analyze_results(self):
"""Collect analayze_data results from plugins.
* Stores :attr:`tanium_hat.main.Main.PCONFIG` under the key "config".
* Stores :attr:`tanium_hat.plugin.Plugin.ANALYZE_RESULTS` under the key "plugin_name" for each enabled plugin.
Returns
-------
ret : dict
* nested dict with the collected results
"""
ret = {}
ret["config"] = self.PCONFIG
for plugin_name, plugin_obj in self.sorted_plugins():
ret[plugin_name] = plugin_obj.ANALYZE_RESULTS
ret[plugin_name]["priority"] = plugin_obj.get_order()
return ret
[docs] def collect_internet_results(self):
"""Collect get_internet_data results from plugins.
* Stores :attr:`tanium_hat.plugin.Plugin.GID_RESULTS` into ret.
* Conflicting key names from different plugins will overwrite each other!
Returns
-------
ret : dict
* dict with the collected results
"""
ret = {}
for plugin_name, plugin_obj in self.sorted_plugins():
gid_results = getattr(plugin_obj, "GID_RESULTS", {})
ret.update(gid_results)
return ret
[docs] def collect_tanium_results(self):
"""Collect get_tanium_data results from plugins.
* Stores :attr:`tanium_hat.plugin.Plugin.GTD_RESULTS` into ret.
Returns
-------
ret : :obj:`list`
* list of dict with the collected results
"""
ret = []
for plugin_name, plugin_obj in self.sorted_plugins():
gtd_results = getattr(plugin_obj, "GTD_RESULTS", {})
for filename, gtd_result in gtd_results.items():
q_ret = gtd_result.get("q_ret", {})
q_obj = q_ret.get("question_object", None)
q_text = getattr(q_obj, "query_text", "Unavailable")
q_id = getattr(q_obj, "id", -1)
q_start = gtd_result.get("q_start", -1)
q_end = gtd_result.get("q_end", -1)
q_elapsed = self._totalsec(gtd_result.get("q_elapsed"))
filepath = gtd_result.get("filepath", "UNDEFINED")
had_errors = bool(getattr(plugin_obj, "ERRORS", []))
results = {}
results["Plugin Name"] = plugin_name
results["Plugin Had Error"] = had_errors
results["Filename"] = filepath
results["Question ID"] = q_id
results["Question Text"] = q_text
results["Question Start Time"] = q_start
results["Question End Time"] = q_end
results["Question Total Seconds"] = q_elapsed
ret.append(results)
if not ret:
results = {}
results["Plugin Name"] = "No plugins asked questions!"
results["Plugin Had Error"] = "None"
results["Filename"] = "None"
results["Question ID"] = "0"
results["Question Text"] = "None"
results["Question Start Time"] = 0
results["Question End Time"] = 0
results["Question Total Seconds"] = 0
ret.append(results)
return ret
[docs] def load_config_file(self, path):
"""Use IniReader() to turn ini file at ``path`` into a dictionary.
Parameters
----------
path : :obj:`str`
* path to ini file to parse
Returns
-------
ret : :obj:`dict`
* ini file contents churned into dict
"""
ir = tanium_kit.ini_reader.IniReader()
try:
path, config = ir.read(ini_path=path)
m = "Loaded INI file: {}"
m = m.format(path)
self.LOG.debug(m)
except Exception as e:
m = "Failed to load INI file: {}, Error: {}"
m = m.format(path, e)
self.err_handler(m)
self.LOG.error(m)
m = "Script Execution Will Ask User for Required Values"
self.LOG.warning(m)
path, config = ("", {})
ret = tanium_kit.store.Store(**{k: tanium_kit.store.Store(**v) for k, v in config.items()})
return ret
[docs] def parse_config_contents(self):
"""Read the :attr:`tanium_hat.main.Main.CONFIG` dictionary for keys and prompt user for missing values.
* Sets :attr:`tanium_hat.main.Main.PCONFIG` with the output of :func:`tanium_hat.main.base_pconfig`
* Uses :func:`tanium_hat.main.Main.parse_config_base` to parse the config for base options
* Uses :func:`tanium_hat.main.Main.parse_config_plugins` to parse the config for plugin options
* Uses :func:`tanium_hat.main.Main.parse_config_analyze` to parse the config for analyze data options
* Uses :func:`tanium_hat.main.Main.parse_config_tanium` to parse the config for get tanium data options
"""
self.LOG.info("Gathering Assessment Information...")
self._sep2()
self.PCONFIG = base_pconfig()
self.parse_config_base()
self.parse_config_plugins()
self.parse_config_analyze()
self.parse_config_tanium()
self._sep2()
self.LOG.info("Finished Gathering Assessment Information...")
[docs] def parse_config_base(self):
"""Parse base options that are always asked.
* Uses :data:`tanium_hat.options.BASE` to determine base options to search for/ask
"""
for k, v in options.BASE.items():
self.PCONFIG[k] = self.get_config_entry(**v)
# store the requests package in self.PCONFIG
self.PCONFIG.requests_pkg = requests
# create a requests wrapper instance and store it in self.PCONFIG
self.PCONFIG.wequests = self.WEQUESTS = tanium_kit.wequests.Wequests(**self.PCONFIG)
# determine if internet is available and store it in self.PCONFIG
self.PCONFIG.has_internet = self.check_internet()
if not self.PCONFIG.has_internet and self.PCONFIG.get_internet_data:
self.LOG.warning("INTERNET CONNECTIVITY CHECK FAILED!")
self.LOG.warning("However it is required for getting internet data.")
self.LOG.warning("For Assistance with this Warning, contact your TAM.")
self.keep_going("Continue without an Internet Connection")
# create the data directory if it does not exist
self.makedir(self.PCONFIG.data_path)
[docs] def parse_config_tanium(self):
"""Parse options that are only asked if the user supplied yes to get_tanium_data.
* Returns if ``get_tanium_data`` is False in :attr:`tanium_hat.main.Main.PCONFIG`
* Uses :data:`tanium_hat.options.TANIUM` to determine base options to search for/ask
"""
if not self.PCONFIG.get_tanium_data:
return
for k, v in options.TANIUM.items():
self.PCONFIG[k] = self.get_config_entry(**v)
[docs] def parse_config_analyze(self):
"""Parse options that are only asked if the user supplied yes to analyze_data.
* Returns if ``analyze_data`` is False in :attr:`tanium_hat.main.Main.PCONFIG`
* Uses :data:`tanium_hat.options.ANALYZE` to determine base options to search for/ask
* Uses :meth:`tanium_hat.main.Main.load_layout_def` to load the python layout definition file as a module
* Uses :meth:`tanium_hat.main.Main.load_layout_pptx` to load the PPTX file referenced in the python layout definition file
"""
if not self.PCONFIG.analyze_data:
return
for k, v in options.ANALYZE.items():
self.PCONFIG[k] = self.get_config_entry(**v)
filename = self.PCONFIG.pptx_output_file
self.PCONFIG.pptx_output_path = os.path.join(self.PCONFIG.data_path, filename)
self.load_layout_def()
self.load_layout_pptx()
[docs] def parse_config_plugins(self):
"""Parse options that are asked for plugins.
* Uses :meth:`tanium_hat.main.Main.get_local_plugins` to load locally available plugins
* Uses :meth:`tanium_hat.main.Main.get_enabled_plugins` to check which plugins get loaded
* Uses :meth:`tanium_hat.main.Main.load_enabled_plugins` to load enabled plugins
"""
self.PCONFIG.plugin_modules = tanium_kit.store.Store()
self.get_local_plugins()
self.get_enabled_plugins()
self.load_enabled_plugins()
[docs] def makedir(self, path):
"""Make a directory and all leading directories as needed.
Parameters
----------
path : :obj:`str`
* Directory to make
"""
if not os.path.exists(path):
m = "Making directory: {}"
self.LOG.info(m.format(path))
os.makedirs(path)
[docs] def save_file(self, out, filename, path, prompt=False, binary=False, **kwargs):
"""Save a file in binary or text mode.
Parameters
----------
out : :obj:`str`
* The contents to write to the file
filename : :obj:`str`
* The basename of the file to save
path : :obj:`str`
* The path to save the file into
prompt : :obj:`bool`, optional
* Default : False
* True: Prompt the user using :func:`tanium_kit.ask.ask` where they would like to save the file to, defaulting to ``path``
* False: Use ``path`` as is
binary : :obj:`bool`, optional
* Default : False
* True : use :func:`tanium_kit.tools.write_binary` to write a binary file
* False : use :func:`tanium_kit.tools.write_file` to write a text file
Returns
-------
file_path : :obj:`str`
* The full path to the file that was saved
"""
if prompt:
m = "Please enter the directory to save {} to"
m = m.format(filename)
path = tanium_kit.ask.ask(prompt=m, default=path)
self.makedir(path)
file_path = os.path.join(path, filename)
if binary:
m = tanium_kit.tools.write_binary(file_path, out)
else:
m = tanium_kit.tools.write_file(file_path, out)
self.LOG.info(m)
return file_path
[docs] def get_local_plugins(self):
"""Find the plugins that are available locally.
* Uses :data:`tanium_hat.constants.PLUGINS_FOUND` to find matching plugins
* Uses :meth:`tanium_hat.main.Main.load_module` to load each plugin that is found
* Uses :meth:`tanium_hat.main.Main.load_module` to reload newer plugins
* Updates ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` with plugin name -> plugin module mapping
"""
if not constants.PLUGINS_FOUND:
m = "No plugin files that match the wildcard '{}' found!!"
m = m.format(constants.PLUGINS_MATCH)
self.err_handler(m)
self.jump_ship()
for path in constants.PLUGINS_FOUND:
mod = self.load_module(path, "Plugin", constants.PLUGIN_ATTRS)
self.PCONFIG.plugin_modules[mod.NAME] = mod
[docs] def load_module(self, path, mod_type, mod_attrs=[]):
"""Load a python file into a module.
* Uses :func:`imp.load_source` to read the file at runtime and load it in as a python module.
* Sets MODULE_PATH and MODULE_FILE as attributes on the returned module.
Parameters
----------
path : :obj:`str`
* Path to the python file to load as a module
mod_type : :obj:`str`
* Type of module
mod_attrs : :obj:`list` of :obj:`str`, optional
* Default : :data:`tanium_hat.constants.ALWAYS_ATTRS`
* List of attributes that must be defined in the loaded module
* Attributes supplied here will be appended to :data:`tanium_hat.constants.ALWAYS_ATTRS`
Returns
-------
ret : :obj:`object`
* Loaded python module from ``path``
"""
mod_attrs += constants.ALWAYS_ATTRS
filename = os.path.basename(path)
basename = os.path.splitext(filename)[0]
try:
ret = imp.load_source(basename, path)
m = "Successfully loaded {} file '{}'"
m = m.format(mod_type, path)
self.LOG.debug(m)
except Exception as e:
m = "Failed to load {} file '{}', error: {}"
m = m.format(mod_type, path, e)
self.LOG.error(m)
self.err_handler(m)
self.jump_ship()
for x in mod_attrs:
if not hasattr(ret, x):
m = "{} file '{}' does not define attribute '{}'"
m = m.format(mod_type, path, x)
self.LOG.error(m)
self.err_handler(m)
self.jump_ship()
min_version = getattr(ret, "MINIMUM_THAT_VERSION")
if version.__version__ < min_version:
m = "THAT v{} failed minimum required v{} for using {} '{}' v{}"
m = m.format(version.__version__, min_version, mod_type, path, ret.VERSION)
self.LOG.error(m)
self.err_handler(m)
self.jump_ship()
else:
m = "THAT v{} passed minimum required v{} for using {} '{}' v{}"
m = m.format(version.__version__, min_version, mod_type, path, ret.VERSION)
self.LOG.debug(m)
ret.MODULE_PATH = path
ret.MODULE_FILE = filename
return ret
[docs] def get_plugin_tmpl(self, pname, pmod):
"""Build a dict to use in templating plugin options.
Parameters
----------
name : :obj:`str`
* name of plugin
pmod : :obj:`tanium_hat.plugin.Plugin`
* module of plugin
Returns
-------
ret : dict
* dict ready for use in templating
"""
ret = {}
ret["plugin_name"] = pname
ret["plugin_priority"] = pmod.PRIORITY
return ret
[docs] def get_plugin_option(self, option_name, pname, pmod):
"""Get a plugin option and templatize it for a given plugin.
* Gets the plugin option from :data:`tanium_hat.options.PLUGINS`
* Uses :meth:`tanium_hat.main.Main.get_plugin_tmpl` to build a dict for use in templating
Parameters
----------
option_name : :obj:`str`
* Name of option to get from :data:`tanium_hat.options.PLUGINS`
name : :obj:`str`
* name of plugin
pmod : :obj:`tanium_hat.plugin.Plugin`
* module of plugin
Returns
-------
ret : dict
* templatized dict for this plugin option
"""
plugin_options = options.PLUGINS[option_name]
plugin_tmpl = self.get_plugin_tmpl(pname, pmod)
ret = {}
ret.update(plugin_options)
for k, v in ret.items():
if isinstance(v, tanium_kit.text_type):
v = v.format(**plugin_tmpl)
ret[k] = v
return ret
[docs] def get_enabled_plugins(self):
"""Determine which plugins to enable.
* Iterates over ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` to check config/ask user if the plugin is enabled
"""
for pname, pmod in self.PCONFIG.plugin_modules.items():
if self.PCONFIG.enable_all_plugins:
opt = self.get_plugin_option("enabled", pname, pmod)
key = opt["entry"]
m = "enable_all_plugins is True, will load plugin module '{}'"
m = m.format(pname)
self.LOG.info(m)
self.PCONFIG[key] = True
for k in options.PLUGINS:
opt = self.get_plugin_option(k, pname, pmod)
key = opt["entry"]
value = self.PCONFIG[key]
if key in self.PCONFIG:
m = "config entry '{}' already set to '{}', not re-asking..."
m = m.format(key, value)
self.LOG.debug(m)
continue
self.PCONFIG[key] = self.get_config_entry(**opt)
[docs] def load_enabled_plugins(self):
"""Load enabled plugins.
* Iterates over ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` for enabled plugins
* Initializes a :class:`tanium_hat.plugin.Plugin` object for each enabled plugin
* Updates ``plugins`` in :attr:`tanium_hat.main.Main.PCONFIG` with plugin name -> plugin object mapping
"""
self.PCONFIG.plugins = tanium_kit.store.Store()
for pname, pmod in self.PCONFIG.plugin_modules.items():
opt = self.get_plugin_option("enabled", pname, pmod)
key = opt["entry"]
if not self.PCONFIG[key]:
m = "Plugin module '{}' is not enabled, not loading"
m = m.format(pname)
self.LOG.info(m)
continue
m = "Plugin module '{}' is enabled, loading"
m = m.format(pname)
self.LOG.info(m)
plugin_args = {}
plugin_args.update(self.PCONFIG)
plugin_args["plugin_name"] = pname
plugin_args["plugin_mod"] = pmod
self.PCONFIG.plugins[pname] = plugin.Plugin(**plugin_args)
[docs] def load_layout_def(self):
"""Load the python layout definition file.
* Gets ``layout_File`` from :attr:`tanium_hat.main.Main.PCONFIG`
* Uses :meth:`tanium_hat.main.Main.load_module` to load the module
* Uses :meth:`tanium_hat.main.Main.load_module` to reload any new layout file
* Uses :func:`tanium_hat.pptx_builder.val_layouts` to validate the layouts in the layout definition file
"""
path = self.PCONFIG.layout_file
filename = os.path.basename(path)
if not os.path.isabs(path):
path = os.path.join(self.PCONFIG.layouts_path, filename)
mod = self.load_module(path, "Layout Definition", constants.LAYOUT_ATTRS)
self.PCONFIG.layout_mod = mod
self.PCONFIG.layouts = self.PCONFIG.layout_mod.LAYOUTS
pptx_builder.val_layouts(self.PCONFIG.layouts)
m = "Successfully validated Layout Definition file '{}'"
m = m.format(path)
self.LOG.info(m)
[docs] def load_layout_pptx(self):
"""Load the PPTX file referenced by the layout definition file.
* Gets ``layout_mod.PPTX_INPUT_FILE`` from :attr:`tanium_hat.main.Main.PCONFIG`
"""
filename = self.PCONFIG.layout_mod.PPTX_INPUT_FILE
src = "'PPTX_INPUT_FILE' attribute in '{}'".format(self.PCONFIG.layout_mod.MODULE_PATH)
filename = self.config_tmpl(filename, src)
path = os.path.join(self.PCONFIG.layouts_path, filename)
self.PCONFIG.pptx_input_file = filename
self.PCONFIG.pptx_input_path = path
if not os.path.exists(path):
m = "Unable to find PPTX input file '{}' referenced in layout file '{}'"
m = m.format(path, self.PCONFIG.layout_mod.MODULE_PATH)
self.LOG.error(m)
self.err_handler(m)
self.jump_ship()
[docs] def check_internet(self):
"""Check to see if an internet connection is available.
Uses :data:`tanium_hat.constants.INTERNET_TEST_URL` as the url to use for checking internet connectivity.
Returns
-------
ret : :obj:`bool`
* True: Internet available
* False: Internet not available
"""
try:
check = self.WEQUESTS.request(url=constants.INTERNET_TEST_URL)
ret = True
except Exception as check:
ret = False
m = "Internet connection found: {} ({})"
self.LOG.debug(m.format(ret, check))
return ret
[docs] def get_config_entry(self, section, entry, **kwargs):
"""Get the value of an entry from self.CONFIG.
If value not supplied or is not the right type, prompt user for value.
Parameters
----------
section : :obj:`str`
* section in self.CONFIG to look for entry
entry : :obj:`str`
* key in section to get value from self.CONFIG[section][entry]
prompt : :obj:`str`
* Prompt to use when asking for value
is_bool : :obj:`bool`, optional
* default: False
* True: value of entry must be a boolean
* False: value of entry does not have to be a boolean
is_int : :obj:`bool`, optional
* default: False
* True: value of entry must be an integer
* False: value of entry does not have to be an integer
empty_ok : :obj:`bool`, optional
* default: False
* True: value of entry can be empty
* False: value of entry can not be empty
force_abs : :obj:`bool`, optional
* default: False
* True: if value of entry is not an absolute path, prepend it with the path of THAT
* False: leave value of entry alone
is_crypt : :obj:`bool`, optional
* default: False
* True: run value of entry through tanium_kit.tools.deobfuscate()
* False: leave value of entry alone
Returns
-------
value : :obj:`str`/bool/int
* value of entry from config file or from user supplied input
"""
checks = [
self.config_check_empty,
self.config_check_bool,
self.config_check_int,
self.config_check_crypt,
self.config_check_tmpl,
self.config_check_abs,
]
value = self.config_check_value(section, entry, **kwargs)
for check in checks:
value = check(value, section, entry, **kwargs)
show_val = "**HIDDEN**" if tanium_kit.ask.secure_value(**kwargs) else value
m = "Config entry '{}' in section '{}' supplied value '{}', not prompting"
self.LOG.info(m.format(entry, section, show_val))
return value
[docs] def config_check_value(self, section, entry, **kwargs):
"""Check if value is defined in config file.
* Checks :data:`os.environ` for THAT_ENTRY and uses that as value if defined
* Checks if :attr:`tanium_hat.main.Main.CONFIG` is defined/is a dict, asks user for value if not
* Checks :attr:`tanium_hat.main.Main.CONFIG` for entry and uses that as value if found
Parameters
----------
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
Returns
-------
ret : :obj:`str`
* value
"""
config = getattr(self, "CONFIG", {})
sect = config.get(section, {})
ret = sect.get(entry, "")
env_entry = "THAT_{}".format(entry.upper())
if env_entry in os.environ:
m = "Environment Variable '{}' override provided, ignoring config entry for {}"
m = m.format(env_entry, entry)
self.LOG.warning(m)
ret = os.environ[env_entry]
elif not self.CONFIG or not isinstance(self.CONFIG, dict):
m = "Config not parsed properly, prompting for entry '{}'"
m = m.format(entry)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
elif section not in self.CONFIG:
m = "Config does not have a section named '{}', prompting for entry '{}'"
m = m.format(section, entry)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
elif entry not in sect:
m = "Config file entry '{}' in section '{}' not supplied, prompting"
m = m.format(entry, section)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
return ret
[docs] def config_check_crypt(self, value, section, entry, **kwargs):
"""Check if value is cryptable.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
is_crypt : :obj:`bool`, optional
* default : False
* True : value is could be a crypted value, use :func:`tanium_kit.tools.deobfuscate` to convert it
* False : leave value alone
Returns
-------
ret : :obj:`str`
* value
"""
is_crypt = kwargs.get("is_crypt", False)
ret = value
if is_crypt:
ret = tanium_kit.tools.deobfuscate(key=constants.CRYPT_KEY, text=value)
return ret
[docs] def config_check_empty(self, value, section, entry, **kwargs):
"""Check if value is empty.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
empty_ok : :obj:`bool`, optional
* default : False
* True : value is allowed to be empty
* False : value is not allowed to be empty, re-ask user if it is empty
Returns
-------
ret : :obj:`str`
* value
"""
empty_ok = kwargs.get("empty_ok", False)
ret = value
if value in [None, ""] and not empty_ok:
m = "Config entry '{}' in section '{}' requires non-empty value, prompting"
m = m.format(entry, section)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
return ret
[docs] def config_check_bool(self, value, section, entry, **kwargs):
"""Convert value into boolean.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
is_bool : :obj:`bool`, optional
* default : False
* True : value should be converted to bool using :func:`tanium_kit.ask.coerce_bool`, ask user if it fails to be converted
* False : leave value alone
Returns
-------
ret : :obj:`str`
* value
"""
is_bool = kwargs.get("is_bool", False)
ret = value
if is_bool:
bool_val = tanium_kit.ask.coerce_bool(value)
if bool_val is None:
m = "Config entry '{}' in section '{}' supplied an invalid boolean value '{}', prompting"
m = m.format(entry, section, value)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
else:
ret = bool_val
return ret
[docs] def config_check_int(self, value, section, entry, **kwargs):
"""Convert value into integer.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
is_int : :obj:`bool`, optional
* default : False
* True : value should be converted to int using :func:`tanium_kit.tools.int_check`, ask user if it fails to be converted
* False : leave value alone
Returns
-------
ret : :obj:`str`
* value
"""
is_int = kwargs.get("is_int", False)
ret = value
if is_int:
int_val = tanium_kit.tools.int_check(value)
if int_val is None:
m = "Config entry '{}' in section '{}' supplied non number value '{}', prompting"
m = m.format(entry, section, value)
self.LOG.warning(m)
ret = tanium_kit.ask.ask(**kwargs)
else:
ret = int_val
return ret
[docs] def valid_tmpl_keys(self):
"""Produce a list of valid template keys.
Returns
-------
ret : :obj:`str`
* str containing CR delimited list of all valid template keys
"""
tmpl = "{{{}}} = '{}'".format
skips = ["password", "CRYPT_KEY"]
only_types = (tanium_kit.string_types, tanium_kit.integer_types, bool)
keys = []
for k, v in sorted(self.PCONFIG.items()):
if k in skips or k.startswith("_") or not isinstance(v, (only_types)):
continue
try:
v = v.replace("\n", " ")
except:
pass
keys.append(tmpl(k, v))
keys_txt = "\n\t".join(keys)
m = "Valid template keys:\n\t{}"
ret = m.format(keys_txt)
return ret
[docs] def config_check_tmpl(self, value, section, entry, **kwargs):
"""Templatize value.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
is_template : :obj:`bool`, optional
* default : False
* True : value should be templatized using :meth:`tanium_hat.main.Main.config_tmpl`
* False : leave value alone
Returns
-------
ret : :obj:`str`
* value
"""
is_tmpl = kwargs.get("is_template", False)
ret = value
if is_tmpl:
src = "config entry '{}' in section '{}'"
src = src.format(entry, section)
ret = self.config_tmpl(value, src)
return ret
[docs] def config_check_abs(self, value, section, entry, **kwargs):
"""Pre-pend value if it is not absolute.
Parameters
----------
value : :obj:`str`
* value from config file/user
section : :obj:`str`
* section in configuration file value came from
entry : :obj:`str`
* entry in section in configuration file value came from
force_abs : :obj:`bool`, optional
* default : False
* str : value should be prefixed with str if value is not absolute
* False : leave value alone
Returns
-------
ret : :obj:`str`
* value
"""
force_abs = kwargs.get("force_abs", False)
ret = value
if force_abs and not os.path.isabs(value):
src = "'force absolute' for config file entry '{}' in section '{}'"
src = src.format(section, entry)
force_abs = self.config_tmpl(force_abs, src, quiet=True)
ret = os.path.join(force_abs, value)
m = "Config file entry '{}' in section '{}' supplied non-absolute path, set to: {}"
m = m.format(section, entry, ret)
self.LOG.info(m)
return ret
[docs] def get_handler(self):
"""Get a PyTan Handler.
Returns
-------
ret : :obj:`pytan.handler.Handler`
* PyTan Handler created using credentials from self.PCONFIG
"""
args = {}
args.update(self.PCONFIG)
args.update(constants.HANDLER_EXTRA_ARGS)
try:
ret = pytan.Handler(**args)
except Exception as e:
m = "Failed to connect to Tanium instance, error: {}"
m = m.format(e)
self.err_handler(m)
self.LOG.error(m)
self.jump_ship()
tanium_kit.log_tools.match_loggers_add_handler("pytan.", self.LH_FILE)
m = "Created connection to Tanium Server: {}"
m = m.format(ret)
self.LOG.info(m)
return ret
[docs] def config_tmpl(self, value, src, quiet=False):
"""Templatize a value.
Parameters
----------
value : :obj:`str`
* string to templatize using :attr:`tanium_hat.main.Main.PCONFIG`
src : :obj:`str`
* where this string came from
quiet : :obj:`bool`, optional
* default : False
* True : Log at debug level
* False : Log at info level
Returns
-------
ret : :obj:`str`
* Templatized string
"""
try:
ret = value.format(**self.PCONFIG)
except Exception as e:
m = "String '{}' from {} supplied invalid template, error: {}"
m = m.format(value, src, e)
self.LOG.error(m)
self.err_handler(m)
m = self.valid_tmpl_keys()
self.LOG.error(m)
self.jump_ship()
m = "Templatized {} from '{}' to '{}'"
m = m.format(src, value, ret)
if quiet:
self.LOG.debug(m)
else:
self.LOG.info(m)
return ret
[docs] def sorted_plugins(self, d=1000):
"""Sort the plugins based on priority.
* Uses :meth:`tanium_hat.plugin.Plugin.get_order` to get the plugin priority
Parameters
----------
d : :obj:`int`, optional
* default : 1000
* priority to use for plugins if not defined
Returns
-------
ret : :obj:`list`
* sorted items() from ``plugins`` in :attr:`tanium_hat.main.Main.PCONFIG`
"""
plugins = self.PCONFIG.plugins.items()
ret = sorted(plugins, key=lambda x: x[1].get_order(d))
return ret
[docs] def run_plugins(self, mode):
"""Run all plugins using mode.
* Uses :meth:`tanium_hat.main.Main.sorted_plugins` to get a list of plugins sorted by priority order
* Uses :meth:`tanium_hat.main.Main.run_plugin` to run the mode for for plugin
Parameters
----------
mode : :obj:`str`
* plugin mode to run, one of "get_tanium_data", "get_internet_data", "analyze_data"
"""
for plugin_name, plugin_obj in self.sorted_plugins():
self.run_plugin(mode, plugin_name, plugin_obj)
[docs] def run_plugin(self, mode, plugin_name, plugin_obj):
"""Run a plugin using mode.
Parameters
----------
mode : :obj:`str`
* plugin mode to run, one of "get_tanium_data", "get_internet_data", "analyze_data"
plugin_name : :obj:`str`
* name of plugin
plugin_obj : :obj:`tanium_hat.plugin.Plugin`
* instantiated object of plugin class
"""
mode_map = {
"get_tanium_data": "getting data from Tanium",
"get_internet_data": "getting data from Internet",
"analyze_data": "analyzing data from CSV's in data directory",
}
if mode in mode_map:
mode_txt = mode_map[mode]
else:
m = "Unsupported plugin mode, must be one of: {}"
m = m.format(', '.join(mode_map))
self.LOG.error(m)
self.err_handler(m)
self.jump_ship()
self._sep2()
m = "Plugin {} -- {}"
self.LOG.info(m.format(plugin_name, mode_txt))
self._sep2()
try:
method = getattr(plugin_obj, mode)
method(**self.PCONFIG)
except Exception as e:
m = "Unexpected error occurred while {} for plugin '{}', error: {}"
m = m.format(mode_txt, plugin_name, e)
plugin_obj.err_handler(m, e=e)
self.track_items(plugin_name, plugin_obj, "ERRORS")
self.track_items(plugin_name, plugin_obj, "EXCEPTIONS")
[docs] def track_items(self, name, obj, attr):
"""Pull items from object and add them into this object by name.
Parameters
----------
name : :obj:`str`
* name to store items from ``attr`` in ``obj`` under this classes tracker
obj : :obj:`object`
* object to get ``attr`` from
attr : :obj:`str`
* attribute to get from ``obj`` and to store in this class
"""
obj_items = getattr(obj, attr, [])
this_items = getattr(self, attr, {})
if obj_items:
if name not in this_items:
this_items[name] = []
for x in obj_items:
if x not in this_items[name]:
this_items[name].append(x)
[docs] def keep_going(self, m):
"""Ask the user if they wish to keep going using prompt ``m``.
Exit if they say no.
Parameters
----------
m : :obj:`str`
* prompt to use when asking the user
"""
check = tanium_kit.ask.ask(prompt=m, is_bool=True, default="no")
if not check:
m = "Exiting..."
self.LOG.info(m)
sys.exit()
[docs] def write_csv(self, results, path, **kwargs):
"""Write a csv file.
* Uses :class:`tanium_kit.excel_writer.ExcelWriter` to pre-parse a list of dicts for writing to CSV
Parameters
----------
results : :obj:`list` of dict
* list of dictionaries to write to the CSV file
path : :obj:`str`
* path to write csv file to
Returns
-------
ret : :obj:`list` of dict
* list of dictionaries post-processing by ExcelWriter
"""
ew = tanium_kit.excel_writer.ExcelWriter()
try:
ret = ew.run(rows=results, **kwargs)
except Exception as e:
ret = "Failed to create CSV using rows: {}, error: {}"
ret = ret.format(results, e)
self.LOG.error(ret)
self.err_handler(ret)
self.write_file(path=path, out=ret)
return ret
[docs] def write_file(self, path, out):
"""Write a file.
* If path is not absolute, path is prepended with ``data_path`` from :attr:`tanium_hat.main.Main.PCONFIG`
Parameters
----------
path : :obj:`str`
* path of file to write
out : :obj:`str`
* contents to write to file
"""
if not os.path.isabs(path):
path = os.path.join(self.PCONFIG.data_path, path)
m = tanium_kit.tools.write_file(path=path, out=out)
self.LOG.info(m)
[docs] def get_user_obj(self):
"""Get the user object for the current user.
* Uses :func:`tanium_kit.pytanx.get_user_obj` to get the user object
Returns
-------
user_obj : :obj:`taniumpy.object_types.user.User`
* User object for the currently logged in user ID in handler
"""
user_obj = tanium_kit.pytanx.get_user_obj(self.PCONFIG.handler, taniumpy)
return user_obj
[docs] def check_pytan_admin(self, role_name="Administrator"):
"""Check if the current pytan user has the Administrator role in Tanium.
* Uses :meth:`tanium_hat.main.Main.get_user_obj` to get the user object
"""
user_obj = self.get_user_obj()
tanium_kit.pytanx.check_required_role(user_obj=user_obj, role_name=role_name)
[docs] def get_info_json(self):
"""Get info.json from the Tanium platform server using PyTan."""
try:
self.check_pytan_admin()
except Exception as e:
ret = "Unable to fetch JSON from 'info.json': {}"
ret = ret.format(e)
self.LOG.error(ret)
self.err_handler(ret)
# wish I could raise here, but we don't want to stop all processing
return None
try:
ret = self.PCONFIG.handler.session.get_server_info()["diags_flat"]
except Exception as e:
ret = "Unable to fetch JSON from 'info.json': {}"
ret = ret.format(e)
self.LOG.error(ret)
self.err_handler(ret)
# wish I could raise here, but we don't want to stop all processing
return None
ret = tanium_kit.pretty.to_json(ret)
m = "Received JSON from 'info.json', result:\n{}"
self.LOG.debug(m.format(ret))
return ret
[docs] def get_module_json(self, name, url):
"""Get json for a module from the Tanium platform server using PyTan."""
try:
ret = self.PCONFIG.handler.session.http_get(url=url)
except Exception as e:
ret = "Unable to connect to module name '{}', url: '{}', error: {}"
ret = ret.format(name, url, e)
self.LOG.error(ret)
self.err_handler(ret)
# wish I could raise here, but we don't want to stop all processing
return None
m = "Received JSON from module name '{}', url: '{}', result:\n{}"
m = m.format(name, url, ret)
self.LOG.debug(m)
return ret
[docs] def write_json_files(self):
"""Write JSON files to the data_path.
* Uses :meth:`tanium_hat.main.Main.get_info_json` to fetch info.json
* Uses :meth:`tanium_hat.main.Main.write_file` to write info.json to data_path
* For each module name and url in :data:`tanium_hat.constants.MODULE_JSON_ITEMS`, get the json from the url using :meth:`tanium_hat.main.Main.get_module_json`, and write the json using :meth:`tanium_hat.main.Main.write_file` to data_path
"""
info_json = self.get_info_json()
filename = "tanium_info.json"
if info_json is not None:
self.write_file(path=filename, out=info_json)
for name, url in constants.MODULE_JSON_ITEMS.items():
module_json = self.get_module_json(name, url)
filename = "{}.json".format(name)
if module_json is not None:
self.write_file(path=filename, out=module_json)
[docs] def err_handler(self, err_str, src=None):
"""Error handler.
* Uses :func:`tanium_kit.tools.orig_tb` to get the current exception string, if any
* Appends exception string to "main" in :attr:`tanium_hat.main.Main.EXCEPTIONS`
* Appends error string to "main" in :attr:`tanium_hat.main.Main.ERRORS`
* Logs exception to debug level
* Logs error message to debug level
* Logs PYTHONPATH to debug level
Parameters
----------
err_str : :obj:`str`
* error string to log/track/spew
"""
k = "main"
if k not in self.EXCEPTIONS:
self.EXCEPTIONS[k] = []
if k not in self.ERRORS:
self.ERRORS[k] = []
try:
exc_str = tanium_kit.tools.orig_tb()
except Exception:
exc_str = ""
self.EXCEPTIONS[k].append(exc_str)
self.ERRORS[k].append(err_str)
self.LOG.debug(exc_str)
self.LOG.debug(err_str)
self.LOG.debug(sys.path)
[docs] def _sep1(self):
"""Log a level 1 separation line."""
self.LOG.info("{}".format("*" * 40))
[docs] def _sep2(self):
"""Log a level 2 separation line."""
self.LOG.info("{}".format("-" * 40))
[docs] def _sep3(self):
"""Log a level 3 separation line."""
self.LOG.info("{}".format("=" * 40))
[docs] def _sep4(self):
"""Log a level 4 separation line."""
self.LOG.info("{}".format("#" * 40))
def _totalsec(self, dt):
try:
ret = dt.total_seconds()
except:
ret = float(0)
return ret
[docs]def base_pconfig():
"""Create a baseline PCONFIG (parsed config) dict.
* Gets pre-populated with the following:
* all attributes from :mod:`tanium_hat.constants`
* all key/values from :data:`tanium_hat.version.TOOL_DICT`
* key: ``now`` time formatted string from :data:`tanium_hat.constants.NOW_FORMAT`
* key: ``prepared_on`` time formatted string from :data:`tanium_hat.constants.PREPARED_ON_FORMAT`
* key: ``that_version`` :attr:`tanium_hat.version.__version__`
* key: ``platform`` output from :func:`platform.platform`
* key: ``python_version`` output from :data:`sys.version`
* key: ``pathsep`` output from :data:`os.sep`
Returns
-------
ret : dict
* baseline PCONFIG dict
"""
ret = tanium_kit.store.Store()
ret.update(constants.__dict__)
ret.update(version.TOOL_DICT)
ret.now = time.strftime(constants.NOW_FORMAT)
ret.prepared_on = time.strftime(constants.PREPARED_ON_FORMAT)
ret.that_version = version.__version__
ret.platform = platform.platform()
ret.python_version = sys.version.replace("\n", " ")
ret.pathsep = os.sep
return ret