Source code for tanium_kit.xml_cleaner

"""A regex based XML cleaner that will replace unsupported characters."""
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import re
import sys

_VERSION = sys.version_info
IS_PY2 = _VERSION[0] == 2
IS_PY3 = _VERSION[0] == 3

if IS_PY2:
    string_types = basestring,  # noqa
    integer_types = (int, long)  # noqa
    text_type = unicode  # noqa
    unichr = unichr  # noqa
    binary_type = str
elif IS_PY3:
    string_types = str,
    integer_types = int,
    text_type = str
    binary_type = bytes
    unichr = chr

LOG = logging.getLogger(__name__.split(".")[-1])

    [0x0009],  # TAB
    [0x000A],  # LINEFEED
    [0x000D],  # CARRIAGE RETURN
    [0x0020, 0xD7FF],  # VALID CHARACTER RANGE 1
    [0xE000, 0xFFFD],  # VALID CHARACTER RANGE 2
"""Valid Unicode characters for XML documents:
    (any Unicode character, excluding the surrogate blocks, FFFE, and FFFF)


    [0x007F, 0x0084],  # one C0 control character and all but one C1 control
    [0x0086, 0x009F],  # one C0 control character and all but one C1 control
    [0xFDD0, 0xFDEF],  # control characters/permanently assigned to non-characters
"""Restricted/discouraged Unicode characters for XML documents:


# If this python build supports unicode ranges above 10000, add to the valid range
if sys.maxunicode > 0x10000:
    XML_1_0_VALID_HEX.append((0x10000, min(sys.maxunicode, 0x10FFFF)))

# Add control characters and non-characters to the restricted range if this python
# build supports the applicable range
for i in [hex(i) for i in range(1, 17)]:
    if not sys.maxunicode >= int("{}FFFF".format(i), 0):
    restrict_range = [int("{}FFFE".format(i), 0), int("{}FFFF".format(i), 0)]

XML_1_0_VALID_UNI_LIST = ["-".join([unichr(y) for y in x]) for x in XML_1_0_VALID_HEX]
"""A list of valid unicode characters"""

"""The text string containing valid unicode characters"""

"""The raw regex string to use when replacing invalid characters"""

"""The regex object to use when replacing invalid characters"""

    "-".join([unichr(y) for y in x]) for x in XML_1_0_RESTRICT_HEX
"""A list of restricted unicode characters"""

"""The text string containing restricted unicode characters"""

"""The raw regex string to use when replacing restricted characters"""

"""The regex object to use when replacing restricted characters"""

"""The default character to use when replacing characters"""

"""The default encoding to use if none supplied"""

    "\u2018": "'",
    "\u2019": "'",
    "\u201c": '"',
    "\u201d": '"',
"""Mapping of smart quotes to replace with their ascii counterpart."""

[docs]def bytes_to_str(text, **kwargs): """Die.""" encoding = kwargs.get("encoding", DEFAULT_ENCODING) errors = kwargs.get("encoding_errors", "replace") ok = "Decoded from bytes using '{}' errors '{}'{}{}" if isinstance(text, binary_type): pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) text = text.decode(encoding, errors) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(encoding, errors, pre, post)) return text
[docs]def str_to_bytes(text, **kwargs): """Die.""" encoding = kwargs.get("encoding", DEFAULT_ENCODING) errors = kwargs.get("encoding_errors", "xmlcharrefreplace") ok = "Encoded to bytes using '{}' errors '{}'{}{}" if isinstance(text, string_types): pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) text = text.encode(encoding, errors) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(encoding, errors, pre, post)) return text
[docs]def clean_chars(text, name, regex, **kwargs): """Die.""" case = kwargs.get(name, True) rc = kwargs.get("clean_char", DEFAULT_REPLACEMENT) ok = "[{} = {}] regex: {!r}, replace character {!r}, found {} characters: {!r}{}{}" if case: matches = regex.findall(text) pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) if matches: text = regex.sub(rc, text) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(name, case, regex.pattern, rc, len(matches), matches, pre, post)) return text
[docs]def decode_recode(text, **kwargs): """Die.""" keys = ["clean_invalid", "clean_restricted"] vals = [kwargs.get(k, True) for k in keys] if any(vals): # convert our str object back to a bytes object with xmlcharrefreplace text = str_to_bytes(text, **kwargs) # convert our bytes object back to a str object text = bytes_to_str(text, **kwargs) return text
[docs]def clean_quotes(text, name, **kwargs): """Die.""" case = kwargs.get(name, True) if case: for k, v in QUOTES_MAP.items(): text = text.replace(k, v) return text
[docs]def clean(text, **kwargs): """Die.""" # ensure that any bytes object is a str object text = bytes_to_str(text, **kwargs) # decode text from str to bytes, and back to str again text = decode_recode(text, **kwargs) # replace invalid characters using the INVALID XML RE text = clean_chars(text, "clean_invalid", INVALID_UNICODE_RE, **kwargs) # replace restricted characters using the INVALID XML RE text = clean_chars(text, "clean_restricted", RESTRICT_UNICODE_RE, **kwargs) # replace smart quotes with normal quotes text = clean_quotes(text, "clean_quotes", **kwargs) return text
[docs]def get_etree(): """Die.""" etree = None engines = ["xml.etree.cElementTree", "lxml.etree", "xml.etree.ElementTree"] for engine in engines: if engine in sys.modules: etree = sys.modules[engine] break try: __import__(engine) etree = sys.modules[engine] m = "Using {} for XML engine" LOG.debug(m.format(engine)) break except ImportError as e: m = "XML engine {} failed to import: {}" LOG.warning(m.format(engine, e)) if etree is None: err = "Failed to import any XML Engine!" raise Exception(err) return etree
[docs]def add_console_log(): """Die.""" handlers = [LOG.handlers, LOG.parent.handlers] if not any(handlers): hf = logging.Formatter("[%(name)-12s] [%(funcName)-15s] %(levelname)-8s %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) ch.setFormatter(hf) LOG.addHandler(ch)
if __name__ == "__main__": LOG.setLevel(logging.DEBUG) add_console_log() etree = get_etree() TESTS = { "XML as bytes with invalid and restricted characters": b"<root><v>TEST.\xc3\xa6\xc2\xad\xc2\xb6\x17\x04\xc3\xa8\xc2\x80\xc2\x80 \x04 \x01h\xc3\xa7\xc2\x8d\xc2\x95\xc3\xa5\xc2\x81\xc2\xa5\xc3\xa7\xc2\x89\xc2\xa1\xc3\xa6\xc2\xa5\xc2\xb4\xc3\xa6\xc2\xb1\xc2\xa1\xc3\xa6\xc2\xb9\xc2\x85\xc3\xa7\xc2\x89\xc2\xa3\xc3\xa7\xc2\x81\xc2\xb9\xc3\xa6\xc2\xa5\xc2\xb4\xc3\xa6\xc2\xb9\xc2\xaf\xc3\xa6\xc2\x95\xc2\x8b</v></root>\r\n", "XML as bytes with special double quotes": b"<root>\r\n <v>Comunica\xc3\xa7\xc3\xa3o com PABX</v>\r\n <v>\xc3\x90\xc3\x85\xc2\xb3\xc3\x87\xc3\x8d\xc2\xa8\xc3\x97\xc3\x80\xc3\x83\xc3\xa6\xc2\xb0\xc2\xb2\xc3\x88\xc2\xab\xc3\x8c\xc3\x97\xc2\xbc\xc3\xbe V2.7.02</v>\r\n <v>\xe2\x80\x9cC:\\WINDOWS\\system32\\ctfmon.exe\xe2\x80\x9d</v>\r\n</root>\r\n", "XML as bytes with surrogates": b"<root>\r\n <v>\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x87\xc2\xb1\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x83\xc2\xbb\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xad\xc2\xb9\xc2\xa4\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd__\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x87\xc2\xb1\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x8f\xc2\xb5\xc3\x8d\xc2\xb3\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x9e\xc2\xb9\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x8b\xc2\xbe</v>\r\n</root>\r\n", } x = [] for test_name, test_data in TESTS.items(): s = "" s += "**********************************\n" s += " * TEST NAME: {}\n".format(test_name) print(s) cleaned = clean(test_data) if IS_PY2: etree_obj = etree.fromstring(cleaned.encode("utf-8")) else: etree_obj = etree.fromstring(cleaned) etree_bytes = etree.tostring(etree_obj, "utf-8") etree_txt = bytes_to_str(etree_bytes) s += ( " * Dirty string type {!r} ==> {!r}\n" " * Cleaned string type {!r} ==> {!r}\n" " * Etree object type {!r} ==> {!r}\n" " * Etree tostring bytes type {!r} ==> {!r}\n" " * Etree tostring str type {!r} ==> {!r}\n" "**********************************\n" ).format( type(test_data).__name__, test_data, type(cleaned).__name__, cleaned, type(etree_obj), etree_obj, type(etree_bytes).__name__, etree_bytes, type(etree_txt).__name__, etree_txt, ) print(s) x.append(s) for s in x: print(s)