Source code for tanium_kit.log_filters
"""Die."""
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import re
[docs]class RegexLogFilter(logging.Filter):
"""Die."""
search_regex = None
replace_regex = ""
filter_mode = "skip"
description = "No description provided!"
test_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
valid_test = ""
_EXC = None
_MODES = ["skip", "include", "replace"]
_TMPL_MODES = ", ".join(_MODES)
_TMPL_NEWEXC = "{}\n!!!! WRAPPED INITIAL {}"
_TMPL_PREVEXC = "{}\n!!!! WRAPPED PREVIOUS {}"
_TMPL_EXC = "EXCEPTION {} in {}"
_TATTRS = ["filter_mode", "search_regex", "replace_regex", "description"]
_TSTR = ", ".join(["{0}: '{{{0}}}'".format(k) for k in _TATTRS])
_TMPL_THIS = "Log Filter {}".format(_TSTR)
[docs] def __init__(self, search_regex, **kwargs):
"""Die."""
err1 = "filter_mode '{}' not one of: {} (in {})"
err2 = "Search '{}' is not a valid regex, error: {} (in {})"
err3 = "Regex filter test using string '{}' failed with error: {} (in {})"
err4 = "Regex filter test using string '{}' failed validation, result '{}' != '{}' (in {})"
self.search_regex = search_regex
self.filter_mode = kwargs.get("filter_mode", self.filter_mode).lower()
self.replace_regex = kwargs.get("replace_regex", self.replace_regex)
self.description = kwargs.get("description", self.description)
self.test_str = kwargs.get("test_str", self.test_str)
self.valid_test = kwargs.get("valid_test", self.valid_test)
self._EXC = None
self._TMPL_THIS = self._TMPL_THIS.format(**vars(self))
if self.filter_mode not in self._MODES:
raise Exception(err1.format(self.filter_mode, self._TMPL_MODES, self._TMPL_THIS))
try:
self.search_regex = self.convert_re(self.search_regex)
except Exception as e:
raise Exception(err2.format(self.search_regex, e, self._TMPL_THIS))
try:
passed, msg = self.regex_filter(self.test_str)
except Exception as e:
raise Exception(err3.format(self.test_str, e, self._TMPL_THIS))
if self.valid_test and msg != self.valid_test:
raise Exception(err4.format(self.test_str, msg, self.valid_test, self._TMPL_THIS))
[docs] def convert_re(self, x):
"""Die."""
return re.compile(x) if not isinstance(x, re._pattern_type) else x
[docs] def filter(self, record):
"""Die."""
ret, record.msg = self.regex_filter(record.msg)
return ret
[docs] def regex_filter(self, msg):
"""Die."""
ret = True
if self.filter_mode == "replace":
# replacement regex does not get tested unless it matches search regex
# we do not want the logging system throwing spurious unexpected exceptions
# so we wrap them here. if this filter has previously had an exception,
# we do not even try to re-run the re.sub, we just append the previous exception
# to the message and return that.
if self._EXC:
msg = self._TMPL_PREVEXC.format(msg, self._EXC)
else:
try:
msg = self.search_regex.sub(self.replace_regex, msg)
except Exception as e:
self._EXC = self._TMPL_EXC.format(e, self._TMPL_THIS)
msg = self._TMPL_NEWEXC.format(msg, self._EXC)
else:
found = True if self.search_regex.search(msg) else False
ret = found if self.filter_mode == "include" else not found
return ret, msg