Source code for tanium_hat.main

"""Tanium Hygiene Assessment Tool (THAT) Main workflow module."""
from __future__ import absolute_import, division, print_function, unicode_literals

import atexit
import datetime
import imp
import os
import platform
import signal
import sys
import time

from . import LH_CON, LOG
from . import constants, options, plugin, pptx_builder, version

try:
    import requests  # TOOL_PATH/libs_external/any/requests/
    import pytan  # TOOL_PATH/libs_tanium/pytan/
    import taniumpy  # TOOL_PATH/libs_tanium/taniumpy/
except:
    raise

try:
    import tanium_kit  # TOOL_PATH/libs_tanium/tanium_kit/
except:
    raise

tanium_kit.log_tools.shutup_requests(loud=constants.DEBUG)


[docs]class Main(object): """Main workflow class for THAT. Must call :meth:`tanium_hat.main.Main.start` in order to start the workflow. Examples -------- >>> # initialize the Main class >>> main = that.main.Main() >>> # use the default configuration file >>> main.start() >>> # or, use a specific configuration file >>> main.start(config_file="my_config.ini") Attributes ---------- CONFIG_FILE : :obj:`str` * Path to configuration file for this instance CONFIG : dict * Unparsed configuration file in dict form, set by :meth:`tanium_hat.main.Main.load_config_file` PCONFIG : dict * Parsed configuration file in dict form, set by :meth:`tanium_hat.main.Main.parse_config_contents` LOG : :obj:`logging.Logger` * Logger for this instance LH_CON : :obj:`logging.StreamHandler` * Console log handler for this instance LH_FILE : :obj:`logging.handlers.RotatingFileHandler` * File log handler for this instance, set by :meth:`tanium_hat.main.Main.add_file_logging` ERRORS : dict * Used to track errors and where they occurred, managed by :meth:`tanium_hat.main.Main.err_handler` EXCEPTIONS : dict * Used to track exceptions and where they occurred, managed by :meth:`tanium_hat.main.Main.err_handler` WEQUESTS : :obj:`tanium_kit.wequests.Wequests` * Wequests object (wrapper for requests) """
[docs] def __init__(self): """Constructor.""" self.LOG = LOG self.LH_CON = LH_CON self.LH_FILE = None self.CONFIG_FILE = "" self.PCONFIG = tanium_kit.store.Store() self.CONFIG = tanium_kit.store.Store() self.ERRORS = {} self.EXCEPTIONS = {} self.WEQUESTS = None
[docs] def start(self, **kwargs): """Main workflow entry point for THAT. Arguments --------- config_file : :obj:`str`, optional * default: :data:`tanium_hat.constants.CONFIG_FILE_PATH` * path to configuration file to use for this instance """ self.CONFIG_FILE = kwargs.get("config_file", constants.CONFIG_FILE_PATH) self.pre_flight() self.flight_check() self.takeoff() self.land()
[docs] def signal_handler(self, signal, frame): """Signal handler for trapping Control-C (SIGINT).""" print("\n") m = "Interrupt received, exiting!" self.LOG.error(m) self.jump_ship()
[docs] def pre_flight(self): """Read configuration file and start file logging. * Updates :attr:`tanium_hat.main.Main.CONFIG` with the dict parsed from :attr:`tanium_hat.main.Main.CONFIG_FILE` * Uses :meth:`tanium_hat.main.Main.add_file_logging` to setup file logging * Uses :meth:`tanium_hat.main.Main.parse_config_contents` to parse the configuration """ signal.signal(signal.SIGINT, self.signal_handler) self.CONFIG = self.load_config_file(path=self.CONFIG_FILE) self.add_file_logging() self._sep1() m = "Tanium Hygiene Assessment Tool (v{}) -- started running" m = m.format(version.__version__) self.LOG.info(m) self._sep1() self.parse_config_contents()
[docs] def add_file_logging(self): """Add a file log handler to the root logger that gets DEBUG and above log messages. * Updates :attr:`tanium_hat.main.Main.LH_FILE` with the file log handler """ default_path = os.path.join(constants.TOOL_PATH, "data") path = self.CONFIG.get("tanium_hat", {}).get("data_path", default_path) if not os.path.isabs(path): path = os.path.join(constants.TOOL_PATH, path) self.LH_FILE = tanium_kit.log_tools.make_handler_file( logger=LOG, log_file_format=constants.LOG_FILE_FORMAT, log_file_level="DEBUG", log_file_handler_name="that_file", log_file_name=constants.LOG_FILE, log_file_dir=path, log_file_count=constants.LOG_FILE_COUNT, log_file_mb=constants.LOG_FILE_MB, ) m = "Now logging to file: {}" m = m.format(self.LH_FILE.stream.name) self.LOG.info(m)
[docs] def flight_check(self): """Verify that at least one mode and one plugin was selected.""" modes = ["get_tanium_data", "get_internet_data", "analyze_data"] any_mode = any([v for k, v in self.PCONFIG.items() if k in modes]) if not any_mode: m = "Get Tanium Data, Get Internet Data, and Analyze data are all disabled" self.LOG.error(m) self.jump_ship() any_plugins = self.PCONFIG.plugins if not any_plugins: self.LOG.error("No plugins enabled") self.jump_ship()
[docs] def land(self): """Print out any errors that occurred and exit.""" self._sep1() m = "Tanium Hygiene Assessment Tool -- finished running" self.LOG.info(m) self._sep1() if self.ERRORS: m = "Errors that occurred throughout this run:" self.LOG.error(m) for k, v in self.ERRORS.items(): for e in v: m = "Error source: {} -- Error: {}" m = m.format(k, e) self.LOG.error(m) self.jump_ship()
[docs] def jump_ship(self, lvl=99): """Method called to exit due to error. Parameters ---------- lvl : :obj:`int`, optional * Default: 99 * Exit code to end process with """ m = "Errors occurred, exiting..." self.LOG.error(m) sys.exit(lvl)
[docs] def register_zipper(self): """Create a zip file of the data_path when python script exits. * Uses :func:`tanium_kit.zipper.mkzip` to create a zip file of the data_path * Uses :func:`atexit.register` to register the zipper function to run when this script reaches the end of execution (due to error or not) """ dest_path = datetime.datetime.utcnow().strftime(constants.ZIP_DEST) zip_file = constants.ZIP_FILE src = "'ZIP_FILE' attribute in '{}'".format(constants.__file__) zip_file = self.config_tmpl(zip_file, src) zip_file = tanium_kit.tools.get_valid_filename(zip_file) zip_path = os.path.join(self.PCONFIG.data_path, zip_file) zipper_args = { "zip_path": zip_path, "src_path": self.PCONFIG.data_path, "dest_path": dest_path, "skips": constants.ZIP_SKIPS, "remove_old": constants.ZIP_REMOVE_OLD, } atexit.register(tanium_kit.zipper.mkzip, **zipper_args)
[docs] def takeoff(self): """Run the enabled modes against the enabled plugins.""" try: self.register_zipper() if self.PCONFIG.get_tanium_data: self.mode_get_tanium_data() if self.PCONFIG.get_internet_data: if self.PCONFIG.has_internet: self.mode_get_internet_data() else: m = "No internet connection available, not getting internet data!" self.LOG.warning(m) if self.PCONFIG.analyze_data: self.mode_analyze_data() except SystemExit: m = "Script exit received!!" self.LOG.warning(m) raise except Exception as e: m = "Error occurred while running: {} !!".format(str(e)) self.err_handler(m) self.LOG.exception(m) self.jump_ship()
[docs] def mode_get_tanium_data(self): """Run get_tanium_data for plugins and write results. * Uses :meth:`tanium_hat.main.Main.get_handler` to get a PyTan handler connection to Tanium. * Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.get_tanium_data` on every plugin * Uses :meth:`tanium_hat.main.Main.collect_tanium_results` to get the results of get_tanium_data for all plugins * Uses :meth:`tanium_hat.main.Main.write_csv` to write results to that_stats.csv in data_path * Uses :meth:`tanium_hat.main.Main.write_json_files` to write json pages from Tanium. """ self.PCONFIG.handler = self.get_handler() self.run_plugins("get_tanium_data") results = self.collect_tanium_results() filename = "that_stats.csv" self.write_csv(results=results, path=filename) self.write_json_files()
[docs] def mode_get_internet_data(self): """Run get_internet_data for plugins and write results. * Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.get_internet_data` on every plugin * Uses :meth:`tanium_hat.main.Main.collect_internet_results` to get the results of get_internet_data for all plugins * Uses :meth:`tanium_hat.main.Main.write_csv` to write results to :data:`tanium_hat.constants.INTERNET_FILE_NAME` in data_path """ self.run_plugins("get_internet_data") results = self.collect_internet_results() self.write_csv(results=[results], path=constants.INTERNET_FILE_NAME)
[docs] def mode_analyze_data(self): """Run analyze_data for plugins, collect results, create PPTX file. * Uses :meth:`tanium_hat.main.Main.run_plugins` to call :meth:`tanium_hat.plugin.Plugin.analyze_data` on every plugin * Uses :meth:`tanium_hat.main.Main.collect_analyze_results` to get the results of analyze_data for all plugins * Uses :class:`tanium_hat.pptx_builder.PptxBuilder` to create the PPTX file. """ self.run_plugins("analyze_data") results = self.collect_analyze_results() builder = self.PCONFIG.pptx_builder = pptx_builder.PptxBuilder(**self.PCONFIG) m = "Created {}" m = m.format(builder) self.LOG.info(m) builder.save() builder.log_slides(layouts=True, debug=constants.DEBUG_PPTX) builder.add_slides(layouts=self.PCONFIG.layouts, results=results) builder.log_slides(layouts=False, debug=constants.DEBUG_PPTX) builder.save()
[docs] def collect_analyze_results(self): """Collect analayze_data results from plugins. * Stores :attr:`tanium_hat.main.Main.PCONFIG` under the key "config". * Stores :attr:`tanium_hat.plugin.Plugin.ANALYZE_RESULTS` under the key "plugin_name" for each enabled plugin. Returns ------- ret : dict * nested dict with the collected results """ ret = {} ret["config"] = self.PCONFIG for plugin_name, plugin_obj in self.sorted_plugins(): ret[plugin_name] = plugin_obj.ANALYZE_RESULTS ret[plugin_name]["priority"] = plugin_obj.get_order() return ret
[docs] def collect_internet_results(self): """Collect get_internet_data results from plugins. * Stores :attr:`tanium_hat.plugin.Plugin.GID_RESULTS` into ret. * Conflicting key names from different plugins will overwrite each other! Returns ------- ret : dict * dict with the collected results """ ret = {} for plugin_name, plugin_obj in self.sorted_plugins(): gid_results = getattr(plugin_obj, "GID_RESULTS", {}) ret.update(gid_results) return ret
[docs] def collect_tanium_results(self): """Collect get_tanium_data results from plugins. * Stores :attr:`tanium_hat.plugin.Plugin.GTD_RESULTS` into ret. Returns ------- ret : :obj:`list` * list of dict with the collected results """ ret = [] for plugin_name, plugin_obj in self.sorted_plugins(): gtd_results = getattr(plugin_obj, "GTD_RESULTS", {}) for filename, gtd_result in gtd_results.items(): q_ret = gtd_result.get("q_ret", {}) q_obj = q_ret.get("question_object", None) q_text = getattr(q_obj, "query_text", "Unavailable") q_id = getattr(q_obj, "id", -1) q_start = gtd_result.get("q_start", -1) q_end = gtd_result.get("q_end", -1) q_elapsed = self._totalsec(gtd_result.get("q_elapsed")) filepath = gtd_result.get("filepath", "UNDEFINED") had_errors = bool(getattr(plugin_obj, "ERRORS", [])) results = {} results["Plugin Name"] = plugin_name results["Plugin Had Error"] = had_errors results["Filename"] = filepath results["Question ID"] = q_id results["Question Text"] = q_text results["Question Start Time"] = q_start results["Question End Time"] = q_end results["Question Total Seconds"] = q_elapsed ret.append(results) if not ret: results = {} results["Plugin Name"] = "No plugins asked questions!" results["Plugin Had Error"] = "None" results["Filename"] = "None" results["Question ID"] = "0" results["Question Text"] = "None" results["Question Start Time"] = 0 results["Question End Time"] = 0 results["Question Total Seconds"] = 0 ret.append(results) return ret
[docs] def load_config_file(self, path): """Use IniReader() to turn ini file at ``path`` into a dictionary. Parameters ---------- path : :obj:`str` * path to ini file to parse Returns ------- ret : :obj:`dict` * ini file contents churned into dict """ ir = tanium_kit.ini_reader.IniReader() try: path, config = ir.read(ini_path=path) m = "Loaded INI file: {}" m = m.format(path) self.LOG.debug(m) except Exception as e: m = "Failed to load INI file: {}, Error: {}" m = m.format(path, e) self.err_handler(m) self.LOG.error(m) m = "Script Execution Will Ask User for Required Values" self.LOG.warning(m) path, config = ("", {}) ret = tanium_kit.store.Store(**{k: tanium_kit.store.Store(**v) for k, v in config.items()}) return ret
[docs] def parse_config_contents(self): """Read the :attr:`tanium_hat.main.Main.CONFIG` dictionary for keys and prompt user for missing values. * Sets :attr:`tanium_hat.main.Main.PCONFIG` with the output of :func:`tanium_hat.main.base_pconfig` * Uses :func:`tanium_hat.main.Main.parse_config_base` to parse the config for base options * Uses :func:`tanium_hat.main.Main.parse_config_plugins` to parse the config for plugin options * Uses :func:`tanium_hat.main.Main.parse_config_analyze` to parse the config for analyze data options * Uses :func:`tanium_hat.main.Main.parse_config_tanium` to parse the config for get tanium data options """ self.LOG.info("Gathering Assessment Information...") self._sep2() self.PCONFIG = base_pconfig() self.parse_config_base() self.parse_config_plugins() self.parse_config_analyze() self.parse_config_tanium() self._sep2() self.LOG.info("Finished Gathering Assessment Information...")
[docs] def parse_config_base(self): """Parse base options that are always asked. * Uses :data:`tanium_hat.options.BASE` to determine base options to search for/ask """ for k, v in options.BASE.items(): self.PCONFIG[k] = self.get_config_entry(**v) # store the requests package in self.PCONFIG self.PCONFIG.requests_pkg = requests # create a requests wrapper instance and store it in self.PCONFIG self.PCONFIG.wequests = self.WEQUESTS = tanium_kit.wequests.Wequests(**self.PCONFIG) # determine if internet is available and store it in self.PCONFIG self.PCONFIG.has_internet = self.check_internet() if not self.PCONFIG.has_internet and self.PCONFIG.get_internet_data: self.LOG.warning("INTERNET CONNECTIVITY CHECK FAILED!") self.LOG.warning("However it is required for getting internet data.") self.LOG.warning("For Assistance with this Warning, contact your TAM.") self.keep_going("Continue without an Internet Connection") # create the data directory if it does not exist self.makedir(self.PCONFIG.data_path)
[docs] def parse_config_tanium(self): """Parse options that are only asked if the user supplied yes to get_tanium_data. * Returns if ``get_tanium_data`` is False in :attr:`tanium_hat.main.Main.PCONFIG` * Uses :data:`tanium_hat.options.TANIUM` to determine base options to search for/ask """ if not self.PCONFIG.get_tanium_data: return for k, v in options.TANIUM.items(): self.PCONFIG[k] = self.get_config_entry(**v)
[docs] def parse_config_analyze(self): """Parse options that are only asked if the user supplied yes to analyze_data. * Returns if ``analyze_data`` is False in :attr:`tanium_hat.main.Main.PCONFIG` * Uses :data:`tanium_hat.options.ANALYZE` to determine base options to search for/ask * Uses :meth:`tanium_hat.main.Main.load_layout_def` to load the python layout definition file as a module * Uses :meth:`tanium_hat.main.Main.load_layout_pptx` to load the PPTX file referenced in the python layout definition file """ if not self.PCONFIG.analyze_data: return for k, v in options.ANALYZE.items(): self.PCONFIG[k] = self.get_config_entry(**v) filename = self.PCONFIG.pptx_output_file self.PCONFIG.pptx_output_path = os.path.join(self.PCONFIG.data_path, filename) self.load_layout_def() self.load_layout_pptx()
[docs] def parse_config_plugins(self): """Parse options that are asked for plugins. * Uses :meth:`tanium_hat.main.Main.get_local_plugins` to load locally available plugins * Uses :meth:`tanium_hat.main.Main.get_enabled_plugins` to check which plugins get loaded * Uses :meth:`tanium_hat.main.Main.load_enabled_plugins` to load enabled plugins """ self.PCONFIG.plugin_modules = tanium_kit.store.Store() self.get_local_plugins() self.get_enabled_plugins() self.load_enabled_plugins()
[docs] def makedir(self, path): """Make a directory and all leading directories as needed. Parameters ---------- path : :obj:`str` * Directory to make """ if not os.path.exists(path): m = "Making directory: {}" self.LOG.info(m.format(path)) os.makedirs(path)
[docs] def save_file(self, out, filename, path, prompt=False, binary=False, **kwargs): """Save a file in binary or text mode. Parameters ---------- out : :obj:`str` * The contents to write to the file filename : :obj:`str` * The basename of the file to save path : :obj:`str` * The path to save the file into prompt : :obj:`bool`, optional * Default : False * True: Prompt the user using :func:`tanium_kit.ask.ask` where they would like to save the file to, defaulting to ``path`` * False: Use ``path`` as is binary : :obj:`bool`, optional * Default : False * True : use :func:`tanium_kit.tools.write_binary` to write a binary file * False : use :func:`tanium_kit.tools.write_file` to write a text file Returns ------- file_path : :obj:`str` * The full path to the file that was saved """ if prompt: m = "Please enter the directory to save {} to" m = m.format(filename) path = tanium_kit.ask.ask(prompt=m, default=path) self.makedir(path) file_path = os.path.join(path, filename) if binary: m = tanium_kit.tools.write_binary(file_path, out) else: m = tanium_kit.tools.write_file(file_path, out) self.LOG.info(m) return file_path
[docs] def get_local_plugins(self): """Find the plugins that are available locally. * Uses :data:`tanium_hat.constants.PLUGINS_FOUND` to find matching plugins * Uses :meth:`tanium_hat.main.Main.load_module` to load each plugin that is found * Uses :meth:`tanium_hat.main.Main.load_module` to reload newer plugins * Updates ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` with plugin name -> plugin module mapping """ if not constants.PLUGINS_FOUND: m = "No plugin files that match the wildcard '{}' found!!" m = m.format(constants.PLUGINS_MATCH) self.err_handler(m) self.jump_ship() for path in constants.PLUGINS_FOUND: mod = self.load_module(path, "Plugin", constants.PLUGIN_ATTRS) self.PCONFIG.plugin_modules[mod.NAME] = mod
[docs] def load_module(self, path, mod_type, mod_attrs=[]): """Load a python file into a module. * Uses :func:`imp.load_source` to read the file at runtime and load it in as a python module. * Sets MODULE_PATH and MODULE_FILE as attributes on the returned module. Parameters ---------- path : :obj:`str` * Path to the python file to load as a module mod_type : :obj:`str` * Type of module mod_attrs : :obj:`list` of :obj:`str`, optional * Default : :data:`tanium_hat.constants.ALWAYS_ATTRS` * List of attributes that must be defined in the loaded module * Attributes supplied here will be appended to :data:`tanium_hat.constants.ALWAYS_ATTRS` Returns ------- ret : :obj:`object` * Loaded python module from ``path`` """ mod_attrs += constants.ALWAYS_ATTRS filename = os.path.basename(path) basename = os.path.splitext(filename)[0] try: ret = imp.load_source(basename, path) m = "Successfully loaded {} file '{}'" m = m.format(mod_type, path) self.LOG.debug(m) except Exception as e: m = "Failed to load {} file '{}', error: {}" m = m.format(mod_type, path, e) self.LOG.error(m) self.err_handler(m) self.jump_ship() for x in mod_attrs: if not hasattr(ret, x): m = "{} file '{}' does not define attribute '{}'" m = m.format(mod_type, path, x) self.LOG.error(m) self.err_handler(m) self.jump_ship() min_version = getattr(ret, "MINIMUM_THAT_VERSION") if version.__version__ < min_version: m = "THAT v{} failed minimum required v{} for using {} '{}' v{}" m = m.format(version.__version__, min_version, mod_type, path, ret.VERSION) self.LOG.error(m) self.err_handler(m) self.jump_ship() else: m = "THAT v{} passed minimum required v{} for using {} '{}' v{}" m = m.format(version.__version__, min_version, mod_type, path, ret.VERSION) self.LOG.debug(m) ret.MODULE_PATH = path ret.MODULE_FILE = filename return ret
[docs] def get_plugin_tmpl(self, pname, pmod): """Build a dict to use in templating plugin options. Parameters ---------- name : :obj:`str` * name of plugin pmod : :obj:`tanium_hat.plugin.Plugin` * module of plugin Returns ------- ret : dict * dict ready for use in templating """ ret = {} ret["plugin_name"] = pname ret["plugin_priority"] = pmod.PRIORITY return ret
[docs] def get_plugin_option(self, option_name, pname, pmod): """Get a plugin option and templatize it for a given plugin. * Gets the plugin option from :data:`tanium_hat.options.PLUGINS` * Uses :meth:`tanium_hat.main.Main.get_plugin_tmpl` to build a dict for use in templating Parameters ---------- option_name : :obj:`str` * Name of option to get from :data:`tanium_hat.options.PLUGINS` name : :obj:`str` * name of plugin pmod : :obj:`tanium_hat.plugin.Plugin` * module of plugin Returns ------- ret : dict * templatized dict for this plugin option """ plugin_options = options.PLUGINS[option_name] plugin_tmpl = self.get_plugin_tmpl(pname, pmod) ret = {} ret.update(plugin_options) for k, v in ret.items(): if isinstance(v, tanium_kit.text_type): v = v.format(**plugin_tmpl) ret[k] = v return ret
[docs] def get_enabled_plugins(self): """Determine which plugins to enable. * Iterates over ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` to check config/ask user if the plugin is enabled """ for pname, pmod in self.PCONFIG.plugin_modules.items(): if self.PCONFIG.enable_all_plugins: opt = self.get_plugin_option("enabled", pname, pmod) key = opt["entry"] m = "enable_all_plugins is True, will load plugin module '{}'" m = m.format(pname) self.LOG.info(m) self.PCONFIG[key] = True for k in options.PLUGINS: opt = self.get_plugin_option(k, pname, pmod) key = opt["entry"] value = self.PCONFIG[key] if key in self.PCONFIG: m = "config entry '{}' already set to '{}', not re-asking..." m = m.format(key, value) self.LOG.debug(m) continue self.PCONFIG[key] = self.get_config_entry(**opt)
[docs] def load_enabled_plugins(self): """Load enabled plugins. * Iterates over ``plugin_modules`` in :attr:`tanium_hat.main.Main.PCONFIG` for enabled plugins * Initializes a :class:`tanium_hat.plugin.Plugin` object for each enabled plugin * Updates ``plugins`` in :attr:`tanium_hat.main.Main.PCONFIG` with plugin name -> plugin object mapping """ self.PCONFIG.plugins = tanium_kit.store.Store() for pname, pmod in self.PCONFIG.plugin_modules.items(): opt = self.get_plugin_option("enabled", pname, pmod) key = opt["entry"] if not self.PCONFIG[key]: m = "Plugin module '{}' is not enabled, not loading" m = m.format(pname) self.LOG.info(m) continue m = "Plugin module '{}' is enabled, loading" m = m.format(pname) self.LOG.info(m) plugin_args = {} plugin_args.update(self.PCONFIG) plugin_args["plugin_name"] = pname plugin_args["plugin_mod"] = pmod self.PCONFIG.plugins[pname] = plugin.Plugin(**plugin_args)
[docs] def load_layout_def(self): """Load the python layout definition file. * Gets ``layout_File`` from :attr:`tanium_hat.main.Main.PCONFIG` * Uses :meth:`tanium_hat.main.Main.load_module` to load the module * Uses :meth:`tanium_hat.main.Main.load_module` to reload any new layout file * Uses :func:`tanium_hat.pptx_builder.val_layouts` to validate the layouts in the layout definition file """ path = self.PCONFIG.layout_file filename = os.path.basename(path) if not os.path.isabs(path): path = os.path.join(self.PCONFIG.layouts_path, filename) mod = self.load_module(path, "Layout Definition", constants.LAYOUT_ATTRS) self.PCONFIG.layout_mod = mod self.PCONFIG.layouts = self.PCONFIG.layout_mod.LAYOUTS pptx_builder.val_layouts(self.PCONFIG.layouts) m = "Successfully validated Layout Definition file '{}'" m = m.format(path) self.LOG.info(m)
[docs] def load_layout_pptx(self): """Load the PPTX file referenced by the layout definition file. * Gets ``layout_mod.PPTX_INPUT_FILE`` from :attr:`tanium_hat.main.Main.PCONFIG` """ filename = self.PCONFIG.layout_mod.PPTX_INPUT_FILE src = "'PPTX_INPUT_FILE' attribute in '{}'".format(self.PCONFIG.layout_mod.MODULE_PATH) filename = self.config_tmpl(filename, src) path = os.path.join(self.PCONFIG.layouts_path, filename) self.PCONFIG.pptx_input_file = filename self.PCONFIG.pptx_input_path = path if not os.path.exists(path): m = "Unable to find PPTX input file '{}' referenced in layout file '{}'" m = m.format(path, self.PCONFIG.layout_mod.MODULE_PATH) self.LOG.error(m) self.err_handler(m) self.jump_ship()
[docs] def check_internet(self): """Check to see if an internet connection is available. Uses :data:`tanium_hat.constants.INTERNET_TEST_URL` as the url to use for checking internet connectivity. Returns ------- ret : :obj:`bool` * True: Internet available * False: Internet not available """ try: check = self.WEQUESTS.request(url=constants.INTERNET_TEST_URL) ret = True except Exception as check: ret = False m = "Internet connection found: {} ({})" self.LOG.debug(m.format(ret, check)) return ret
[docs] def get_config_entry(self, section, entry, **kwargs): """Get the value of an entry from self.CONFIG. If value not supplied or is not the right type, prompt user for value. Parameters ---------- section : :obj:`str` * section in self.CONFIG to look for entry entry : :obj:`str` * key in section to get value from self.CONFIG[section][entry] prompt : :obj:`str` * Prompt to use when asking for value is_bool : :obj:`bool`, optional * default: False * True: value of entry must be a boolean * False: value of entry does not have to be a boolean is_int : :obj:`bool`, optional * default: False * True: value of entry must be an integer * False: value of entry does not have to be an integer empty_ok : :obj:`bool`, optional * default: False * True: value of entry can be empty * False: value of entry can not be empty force_abs : :obj:`bool`, optional * default: False * True: if value of entry is not an absolute path, prepend it with the path of THAT * False: leave value of entry alone is_crypt : :obj:`bool`, optional * default: False * True: run value of entry through tanium_kit.tools.deobfuscate() * False: leave value of entry alone Returns ------- value : :obj:`str`/bool/int * value of entry from config file or from user supplied input """ checks = [ self.config_check_empty, self.config_check_bool, self.config_check_int, self.config_check_crypt, self.config_check_tmpl, self.config_check_abs, ] value = self.config_check_value(section, entry, **kwargs) for check in checks: value = check(value, section, entry, **kwargs) show_val = "**HIDDEN**" if tanium_kit.ask.secure_value(**kwargs) else value m = "Config entry '{}' in section '{}' supplied value '{}', not prompting" self.LOG.info(m.format(entry, section, show_val)) return value
[docs] def config_check_value(self, section, entry, **kwargs): """Check if value is defined in config file. * Checks :data:`os.environ` for THAT_ENTRY and uses that as value if defined * Checks if :attr:`tanium_hat.main.Main.CONFIG` is defined/is a dict, asks user for value if not * Checks :attr:`tanium_hat.main.Main.CONFIG` for entry and uses that as value if found Parameters ---------- section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from Returns ------- ret : :obj:`str` * value """ config = getattr(self, "CONFIG", {}) sect = config.get(section, {}) ret = sect.get(entry, "") env_entry = "THAT_{}".format(entry.upper()) if env_entry in os.environ: m = "Environment Variable '{}' override provided, ignoring config entry for {}" m = m.format(env_entry, entry) self.LOG.warning(m) ret = os.environ[env_entry] elif not self.CONFIG or not isinstance(self.CONFIG, dict): m = "Config not parsed properly, prompting for entry '{}'" m = m.format(entry) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) elif section not in self.CONFIG: m = "Config does not have a section named '{}', prompting for entry '{}'" m = m.format(section, entry) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) elif entry not in sect: m = "Config file entry '{}' in section '{}' not supplied, prompting" m = m.format(entry, section) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) return ret
[docs] def config_check_crypt(self, value, section, entry, **kwargs): """Check if value is cryptable. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from is_crypt : :obj:`bool`, optional * default : False * True : value is could be a crypted value, use :func:`tanium_kit.tools.deobfuscate` to convert it * False : leave value alone Returns ------- ret : :obj:`str` * value """ is_crypt = kwargs.get("is_crypt", False) ret = value if is_crypt: ret = tanium_kit.tools.deobfuscate(key=constants.CRYPT_KEY, text=value) return ret
[docs] def config_check_empty(self, value, section, entry, **kwargs): """Check if value is empty. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from empty_ok : :obj:`bool`, optional * default : False * True : value is allowed to be empty * False : value is not allowed to be empty, re-ask user if it is empty Returns ------- ret : :obj:`str` * value """ empty_ok = kwargs.get("empty_ok", False) ret = value if value in [None, ""] and not empty_ok: m = "Config entry '{}' in section '{}' requires non-empty value, prompting" m = m.format(entry, section) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) return ret
[docs] def config_check_bool(self, value, section, entry, **kwargs): """Convert value into boolean. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from is_bool : :obj:`bool`, optional * default : False * True : value should be converted to bool using :func:`tanium_kit.ask.coerce_bool`, ask user if it fails to be converted * False : leave value alone Returns ------- ret : :obj:`str` * value """ is_bool = kwargs.get("is_bool", False) ret = value if is_bool: bool_val = tanium_kit.ask.coerce_bool(value) if bool_val is None: m = "Config entry '{}' in section '{}' supplied an invalid boolean value '{}', prompting" m = m.format(entry, section, value) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) else: ret = bool_val return ret
[docs] def config_check_int(self, value, section, entry, **kwargs): """Convert value into integer. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from is_int : :obj:`bool`, optional * default : False * True : value should be converted to int using :func:`tanium_kit.tools.int_check`, ask user if it fails to be converted * False : leave value alone Returns ------- ret : :obj:`str` * value """ is_int = kwargs.get("is_int", False) ret = value if is_int: int_val = tanium_kit.tools.int_check(value) if int_val is None: m = "Config entry '{}' in section '{}' supplied non number value '{}', prompting" m = m.format(entry, section, value) self.LOG.warning(m) ret = tanium_kit.ask.ask(**kwargs) else: ret = int_val return ret
[docs] def valid_tmpl_keys(self): """Produce a list of valid template keys. Returns ------- ret : :obj:`str` * str containing CR delimited list of all valid template keys """ tmpl = "{{{}}} = '{}'".format skips = ["password", "CRYPT_KEY"] only_types = (tanium_kit.string_types, tanium_kit.integer_types, bool) keys = [] for k, v in sorted(self.PCONFIG.items()): if k in skips or k.startswith("_") or not isinstance(v, (only_types)): continue try: v = v.replace("\n", " ") except: pass keys.append(tmpl(k, v)) keys_txt = "\n\t".join(keys) m = "Valid template keys:\n\t{}" ret = m.format(keys_txt) return ret
[docs] def config_check_tmpl(self, value, section, entry, **kwargs): """Templatize value. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from is_template : :obj:`bool`, optional * default : False * True : value should be templatized using :meth:`tanium_hat.main.Main.config_tmpl` * False : leave value alone Returns ------- ret : :obj:`str` * value """ is_tmpl = kwargs.get("is_template", False) ret = value if is_tmpl: src = "config entry '{}' in section '{}'" src = src.format(entry, section) ret = self.config_tmpl(value, src) return ret
[docs] def config_check_abs(self, value, section, entry, **kwargs): """Pre-pend value if it is not absolute. Parameters ---------- value : :obj:`str` * value from config file/user section : :obj:`str` * section in configuration file value came from entry : :obj:`str` * entry in section in configuration file value came from force_abs : :obj:`bool`, optional * default : False * str : value should be prefixed with str if value is not absolute * False : leave value alone Returns ------- ret : :obj:`str` * value """ force_abs = kwargs.get("force_abs", False) ret = value if force_abs and not os.path.isabs(value): src = "'force absolute' for config file entry '{}' in section '{}'" src = src.format(section, entry) force_abs = self.config_tmpl(force_abs, src, quiet=True) ret = os.path.join(force_abs, value) m = "Config file entry '{}' in section '{}' supplied non-absolute path, set to: {}" m = m.format(section, entry, ret) self.LOG.info(m) return ret
[docs] def get_handler(self): """Get a PyTan Handler. Returns ------- ret : :obj:`pytan.handler.Handler` * PyTan Handler created using credentials from self.PCONFIG """ args = {} args.update(self.PCONFIG) args.update(constants.HANDLER_EXTRA_ARGS) try: ret = pytan.Handler(**args) except Exception as e: m = "Failed to connect to Tanium instance, error: {}" m = m.format(e) self.err_handler(m) self.LOG.error(m) self.jump_ship() tanium_kit.log_tools.match_loggers_add_handler("pytan.", self.LH_FILE) m = "Created connection to Tanium Server: {}" m = m.format(ret) self.LOG.info(m) return ret
[docs] def config_tmpl(self, value, src, quiet=False): """Templatize a value. Parameters ---------- value : :obj:`str` * string to templatize using :attr:`tanium_hat.main.Main.PCONFIG` src : :obj:`str` * where this string came from quiet : :obj:`bool`, optional * default : False * True : Log at debug level * False : Log at info level Returns ------- ret : :obj:`str` * Templatized string """ try: ret = value.format(**self.PCONFIG) except Exception as e: m = "String '{}' from {} supplied invalid template, error: {}" m = m.format(value, src, e) self.LOG.error(m) self.err_handler(m) m = self.valid_tmpl_keys() self.LOG.error(m) self.jump_ship() m = "Templatized {} from '{}' to '{}'" m = m.format(src, value, ret) if quiet: self.LOG.debug(m) else: self.LOG.info(m) return ret
[docs] def sorted_plugins(self, d=1000): """Sort the plugins based on priority. * Uses :meth:`tanium_hat.plugin.Plugin.get_order` to get the plugin priority Parameters ---------- d : :obj:`int`, optional * default : 1000 * priority to use for plugins if not defined Returns ------- ret : :obj:`list` * sorted items() from ``plugins`` in :attr:`tanium_hat.main.Main.PCONFIG` """ plugins = self.PCONFIG.plugins.items() ret = sorted(plugins, key=lambda x: x[1].get_order(d)) return ret
[docs] def run_plugins(self, mode): """Run all plugins using mode. * Uses :meth:`tanium_hat.main.Main.sorted_plugins` to get a list of plugins sorted by priority order * Uses :meth:`tanium_hat.main.Main.run_plugin` to run the mode for for plugin Parameters ---------- mode : :obj:`str` * plugin mode to run, one of "get_tanium_data", "get_internet_data", "analyze_data" """ for plugin_name, plugin_obj in self.sorted_plugins(): self.run_plugin(mode, plugin_name, plugin_obj)
[docs] def run_plugin(self, mode, plugin_name, plugin_obj): """Run a plugin using mode. Parameters ---------- mode : :obj:`str` * plugin mode to run, one of "get_tanium_data", "get_internet_data", "analyze_data" plugin_name : :obj:`str` * name of plugin plugin_obj : :obj:`tanium_hat.plugin.Plugin` * instantiated object of plugin class """ mode_map = { "get_tanium_data": "getting data from Tanium", "get_internet_data": "getting data from Internet", "analyze_data": "analyzing data from CSV's in data directory", } if mode in mode_map: mode_txt = mode_map[mode] else: m = "Unsupported plugin mode, must be one of: {}" m = m.format(', '.join(mode_map)) self.LOG.error(m) self.err_handler(m) self.jump_ship() self._sep2() m = "Plugin {} -- {}" self.LOG.info(m.format(plugin_name, mode_txt)) self._sep2() try: method = getattr(plugin_obj, mode) method(**self.PCONFIG) except Exception as e: m = "Unexpected error occurred while {} for plugin '{}', error: {}" m = m.format(mode_txt, plugin_name, e) plugin_obj.err_handler(m, e=e) self.track_items(plugin_name, plugin_obj, "ERRORS") self.track_items(plugin_name, plugin_obj, "EXCEPTIONS")
[docs] def track_items(self, name, obj, attr): """Pull items from object and add them into this object by name. Parameters ---------- name : :obj:`str` * name to store items from ``attr`` in ``obj`` under this classes tracker obj : :obj:`object` * object to get ``attr`` from attr : :obj:`str` * attribute to get from ``obj`` and to store in this class """ obj_items = getattr(obj, attr, []) this_items = getattr(self, attr, {}) if obj_items: if name not in this_items: this_items[name] = [] for x in obj_items: if x not in this_items[name]: this_items[name].append(x)
[docs] def keep_going(self, m): """Ask the user if they wish to keep going using prompt ``m``. Exit if they say no. Parameters ---------- m : :obj:`str` * prompt to use when asking the user """ check = tanium_kit.ask.ask(prompt=m, is_bool=True, default="no") if not check: m = "Exiting..." self.LOG.info(m) sys.exit()
[docs] def write_csv(self, results, path, **kwargs): """Write a csv file. * Uses :class:`tanium_kit.excel_writer.ExcelWriter` to pre-parse a list of dicts for writing to CSV Parameters ---------- results : :obj:`list` of dict * list of dictionaries to write to the CSV file path : :obj:`str` * path to write csv file to Returns ------- ret : :obj:`list` of dict * list of dictionaries post-processing by ExcelWriter """ ew = tanium_kit.excel_writer.ExcelWriter() try: ret = ew.run(rows=results, **kwargs) except Exception as e: ret = "Failed to create CSV using rows: {}, error: {}" ret = ret.format(results, e) self.LOG.error(ret) self.err_handler(ret) self.write_file(path=path, out=ret) return ret
[docs] def write_file(self, path, out): """Write a file. * If path is not absolute, path is prepended with ``data_path`` from :attr:`tanium_hat.main.Main.PCONFIG` Parameters ---------- path : :obj:`str` * path of file to write out : :obj:`str` * contents to write to file """ if not os.path.isabs(path): path = os.path.join(self.PCONFIG.data_path, path) m = tanium_kit.tools.write_file(path=path, out=out) self.LOG.info(m)
[docs] def get_user_obj(self): """Get the user object for the current user. * Uses :func:`tanium_kit.pytanx.get_user_obj` to get the user object Returns ------- user_obj : :obj:`taniumpy.object_types.user.User` * User object for the currently logged in user ID in handler """ user_obj = tanium_kit.pytanx.get_user_obj(self.PCONFIG.handler, taniumpy) return user_obj
[docs] def check_pytan_admin(self, role_name="Administrator"): """Check if the current pytan user has the Administrator role in Tanium. * Uses :meth:`tanium_hat.main.Main.get_user_obj` to get the user object """ user_obj = self.get_user_obj() tanium_kit.pytanx.check_required_role(user_obj=user_obj, role_name=role_name)
[docs] def get_info_json(self): """Get info.json from the Tanium platform server using PyTan.""" try: self.check_pytan_admin() except Exception as e: ret = "Unable to fetch JSON from 'info.json': {}" ret = ret.format(e) self.LOG.error(ret) self.err_handler(ret) # wish I could raise here, but we don't want to stop all processing return None try: ret = self.PCONFIG.handler.session.get_server_info()["diags_flat"] except Exception as e: ret = "Unable to fetch JSON from 'info.json': {}" ret = ret.format(e) self.LOG.error(ret) self.err_handler(ret) # wish I could raise here, but we don't want to stop all processing return None ret = tanium_kit.pretty.to_json(ret) m = "Received JSON from 'info.json', result:\n{}" self.LOG.debug(m.format(ret)) return ret
[docs] def get_module_json(self, name, url): """Get json for a module from the Tanium platform server using PyTan.""" try: ret = self.PCONFIG.handler.session.http_get(url=url) except Exception as e: ret = "Unable to connect to module name '{}', url: '{}', error: {}" ret = ret.format(name, url, e) self.LOG.error(ret) self.err_handler(ret) # wish I could raise here, but we don't want to stop all processing return None m = "Received JSON from module name '{}', url: '{}', result:\n{}" m = m.format(name, url, ret) self.LOG.debug(m) return ret
[docs] def write_json_files(self): """Write JSON files to the data_path. * Uses :meth:`tanium_hat.main.Main.get_info_json` to fetch info.json * Uses :meth:`tanium_hat.main.Main.write_file` to write info.json to data_path * For each module name and url in :data:`tanium_hat.constants.MODULE_JSON_ITEMS`, get the json from the url using :meth:`tanium_hat.main.Main.get_module_json`, and write the json using :meth:`tanium_hat.main.Main.write_file` to data_path """ info_json = self.get_info_json() filename = "tanium_info.json" if info_json is not None: self.write_file(path=filename, out=info_json) for name, url in constants.MODULE_JSON_ITEMS.items(): module_json = self.get_module_json(name, url) filename = "{}.json".format(name) if module_json is not None: self.write_file(path=filename, out=module_json)
[docs] def err_handler(self, err_str, src=None): """Error handler. * Uses :func:`tanium_kit.tools.orig_tb` to get the current exception string, if any * Appends exception string to "main" in :attr:`tanium_hat.main.Main.EXCEPTIONS` * Appends error string to "main" in :attr:`tanium_hat.main.Main.ERRORS` * Logs exception to debug level * Logs error message to debug level * Logs PYTHONPATH to debug level Parameters ---------- err_str : :obj:`str` * error string to log/track/spew """ k = "main" if k not in self.EXCEPTIONS: self.EXCEPTIONS[k] = [] if k not in self.ERRORS: self.ERRORS[k] = [] try: exc_str = tanium_kit.tools.orig_tb() except Exception: exc_str = "" self.EXCEPTIONS[k].append(exc_str) self.ERRORS[k].append(err_str) self.LOG.debug(exc_str) self.LOG.debug(err_str) self.LOG.debug(sys.path)
[docs] def _sep1(self): """Log a level 1 separation line.""" self.LOG.info("{}".format("*" * 40))
[docs] def _sep2(self): """Log a level 2 separation line.""" self.LOG.info("{}".format("-" * 40))
[docs] def _sep3(self): """Log a level 3 separation line.""" self.LOG.info("{}".format("=" * 40))
[docs] def _sep4(self): """Log a level 4 separation line.""" self.LOG.info("{}".format("#" * 40))
def _totalsec(self, dt): try: ret = dt.total_seconds() except: ret = float(0) return ret
[docs]def base_pconfig(): """Create a baseline PCONFIG (parsed config) dict. * Gets pre-populated with the following: * all attributes from :mod:`tanium_hat.constants` * all key/values from :data:`tanium_hat.version.TOOL_DICT` * key: ``now`` time formatted string from :data:`tanium_hat.constants.NOW_FORMAT` * key: ``prepared_on`` time formatted string from :data:`tanium_hat.constants.PREPARED_ON_FORMAT` * key: ``that_version`` :attr:`tanium_hat.version.__version__` * key: ``platform`` output from :func:`platform.platform` * key: ``python_version`` output from :data:`sys.version` * key: ``pathsep`` output from :data:`os.sep` Returns ------- ret : dict * baseline PCONFIG dict """ ret = tanium_kit.store.Store() ret.update(constants.__dict__) ret.update(version.TOOL_DICT) ret.now = time.strftime(constants.NOW_FORMAT) ret.prepared_on = time.strftime(constants.PREPARED_ON_FORMAT) ret.that_version = version.__version__ ret.platform = platform.platform() ret.python_version = sys.version.replace("\n", " ") ret.pathsep = os.sep return ret