Source code for tanium_kit.xml_cleaner

"""A regex based XML cleaner that will replace unsupported characters."""
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import re
import sys

_VERSION = sys.version_info
IS_PY2 = _VERSION[0] == 2
IS_PY3 = _VERSION[0] == 3

if IS_PY2:
    string_types = basestring,  # noqa
    integer_types = (int, long)  # noqa
    text_type = unicode  # noqa
    unichr = unichr  # noqa
    binary_type = str
elif IS_PY3:
    string_types = str,
    integer_types = int,
    text_type = str
    binary_type = bytes
    unichr = chr

LOG = logging.getLogger(__name__.split(".")[-1])

XML_1_0_VALID_HEX = [
    [0x0009],  # TAB
    [0x000A],  # LINEFEED
    [0x000D],  # CARRIAGE RETURN
    [0x0020, 0xD7FF],  # VALID CHARACTER RANGE 1
    [0xE000, 0xFFFD],  # VALID CHARACTER RANGE 2
]
"""Valid Unicode characters for XML documents:
    (any Unicode character, excluding the surrogate blocks, FFFE, and FFFF)
    #x9,
    #xA,
    #xD,
    [#x20-#xD7FF],
    [#xE000-#xFFFD],
    [#x10000-#x10FFFF]

Source: http://www.w3.org/TR/REC-xml/#NT-Char
"""

XML_1_0_RESTRICT_HEX = [
    [0x007F, 0x0084],  # one C0 control character and all but one C1 control
    [0x0086, 0x009F],  # one C0 control character and all but one C1 control
    [0xFDD0, 0xFDEF],  # control characters/permanently assigned to non-characters
]
"""Restricted/discouraged Unicode characters for XML documents:
    [#x7F-#x84],
    [#x86-#x9F],
    [#xFDD0-#xFDEF],
    [#x1FFFE-#x1FFFF],
    [#x2FFFE-#x2FFFF],
    [#x3FFFE-#x3FFFF],
    [#x4FFFE-#x4FFFF],
    [#x5FFFE-#x5FFFF],
    [#x6FFFE-#x6FFFF],
    [#x7FFFE-#x7FFFF],
    [#x8FFFE-#x8FFFF],
    [#x9FFFE-#x9FFFF],
    [#xAFFFE-#xAFFFF],
    [#xBFFFE-#xBFFFF],
    [#xCFFFE-#xCFFFF],
    [#xDFFFE-#xDFFFF],
    [#xEFFFE-#xEFFFF],
    [#xFFFFE-#xFFFFF],
    [#x10FFFE-#x10FFFF]

Source: http://www.w3.org/TR/REC-xml/#NT-Char
"""

# If this python build supports unicode ranges above 10000, add to the valid range
if sys.maxunicode > 0x10000:
    XML_1_0_VALID_HEX.append((0x10000, min(sys.maxunicode, 0x10FFFF)))

# Add control characters and non-characters to the restricted range if this python
# build supports the applicable range
for i in [hex(i) for i in range(1, 17)]:
    if not sys.maxunicode >= int("{}FFFF".format(i), 0):
        continue
    restrict_range = [int("{}FFFE".format(i), 0), int("{}FFFF".format(i), 0)]
    XML_1_0_RESTRICT_HEX.append(restrict_range)

XML_1_0_VALID_UNI_LIST = ["-".join([unichr(y) for y in x]) for x in XML_1_0_VALID_HEX]
"""A list of valid unicode characters"""

XML_1_0_VALID_UNI_TEXT = "".join(XML_1_0_VALID_UNI_LIST)
"""The text string containing valid unicode characters"""

INVALID_UNICODE_RAW_RE = r"[^{}]".format(XML_1_0_VALID_UNI_TEXT)
"""The raw regex string to use when replacing invalid characters"""

INVALID_UNICODE_RE = re.compile(INVALID_UNICODE_RAW_RE, re.U)
"""The regex object to use when replacing invalid characters"""

XML_1_0_RESTRICT_UNI_LIST = [
    "-".join([unichr(y) for y in x]) for x in XML_1_0_RESTRICT_HEX
]
"""A list of restricted unicode characters"""

XML_1_0_RESTRICT_UNI_TEXT = "".join(XML_1_0_RESTRICT_UNI_LIST)
"""The text string containing restricted unicode characters"""

RESTRICT_UNICODE_RAW_RE = r"[{}]".format(XML_1_0_RESTRICT_UNI_TEXT)
"""The raw regex string to use when replacing restricted characters"""

RESTRICT_UNICODE_RE = re.compile(RESTRICT_UNICODE_RAW_RE, re.U)
"""The regex object to use when replacing restricted characters"""

DEFAULT_REPLACEMENT = "\uFFFD"
"""The default character to use when replacing characters"""

DEFAULT_ENCODING = "utf-8"
"""The default encoding to use if none supplied"""

QUOTES_MAP = {
    "\u2018": "'",
    "\u2019": "'",
    "\u201c": '"',
    "\u201d": '"',
}
"""Mapping of smart quotes to replace with their ascii counterpart."""


[docs]def bytes_to_str(text, **kwargs): """Die.""" encoding = kwargs.get("encoding", DEFAULT_ENCODING) errors = kwargs.get("encoding_errors", "replace") ok = "Decoded from bytes using '{}' errors '{}'{}{}" if isinstance(text, binary_type): pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) text = text.decode(encoding, errors) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(encoding, errors, pre, post)) return text
[docs]def str_to_bytes(text, **kwargs): """Die.""" encoding = kwargs.get("encoding", DEFAULT_ENCODING) errors = kwargs.get("encoding_errors", "xmlcharrefreplace") ok = "Encoded to bytes using '{}' errors '{}'{}{}" if isinstance(text, string_types): pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) text = text.encode(encoding, errors) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(encoding, errors, pre, post)) return text
[docs]def clean_chars(text, name, regex, **kwargs): """Die.""" case = kwargs.get(name, True) rc = kwargs.get("clean_char", DEFAULT_REPLACEMENT) ok = "[{} = {}] regex: {!r}, replace character {!r}, found {} characters: {!r}{}{}" if case: matches = regex.findall(text) pre = " (pre type '{}' len '{}')".format(type(text).__name__, len(text)) if matches: text = regex.sub(rc, text) post = " (post type '{}' len '{}')".format(type(text).__name__, len(text)) LOG.debug(ok.format(name, case, regex.pattern, rc, len(matches), matches, pre, post)) return text
[docs]def decode_recode(text, **kwargs): """Die.""" keys = ["clean_invalid", "clean_restricted"] vals = [kwargs.get(k, True) for k in keys] if any(vals): # convert our str object back to a bytes object with xmlcharrefreplace text = str_to_bytes(text, **kwargs) # convert our bytes object back to a str object text = bytes_to_str(text, **kwargs) return text
[docs]def clean_quotes(text, name, **kwargs): """Die.""" case = kwargs.get(name, True) if case: for k, v in QUOTES_MAP.items(): text = text.replace(k, v) return text
[docs]def clean(text, **kwargs): """Die.""" # ensure that any bytes object is a str object text = bytes_to_str(text, **kwargs) # decode text from str to bytes, and back to str again text = decode_recode(text, **kwargs) # replace invalid characters using the INVALID XML RE text = clean_chars(text, "clean_invalid", INVALID_UNICODE_RE, **kwargs) # replace restricted characters using the INVALID XML RE text = clean_chars(text, "clean_restricted", RESTRICT_UNICODE_RE, **kwargs) # replace smart quotes with normal quotes text = clean_quotes(text, "clean_quotes", **kwargs) return text
[docs]def get_etree(): """Die.""" etree = None engines = ["xml.etree.cElementTree", "lxml.etree", "xml.etree.ElementTree"] for engine in engines: if engine in sys.modules: etree = sys.modules[engine] break try: __import__(engine) etree = sys.modules[engine] m = "Using {} for XML engine" LOG.debug(m.format(engine)) break except ImportError as e: m = "XML engine {} failed to import: {}" LOG.warning(m.format(engine, e)) if etree is None: err = "Failed to import any XML Engine!" raise Exception(err) return etree
[docs]def add_console_log(): """Die.""" handlers = [LOG.handlers, LOG.parent.handlers] if not any(handlers): hf = logging.Formatter("[%(name)-12s] [%(funcName)-15s] %(levelname)-8s %(message)s") ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) ch.setFormatter(hf) LOG.addHandler(ch)
if __name__ == "__main__": LOG.setLevel(logging.DEBUG) add_console_log() etree = get_etree() TESTS = { "XML as bytes with invalid and restricted characters": b"<root><v>TEST.\xc3\xa6\xc2\xad\xc2\xb6\x17\x04\xc3\xa8\xc2\x80\xc2\x80 \x04 \x01h\xc3\xa7\xc2\x8d\xc2\x95\xc3\xa5\xc2\x81\xc2\xa5\xc3\xa7\xc2\x89\xc2\xa1\xc3\xa6\xc2\xa5\xc2\xb4\xc3\xa6\xc2\xb1\xc2\xa1\xc3\xa6\xc2\xb9\xc2\x85\xc3\xa7\xc2\x89\xc2\xa3\xc3\xa7\xc2\x81\xc2\xb9\xc3\xa6\xc2\xa5\xc2\xb4\xc3\xa6\xc2\xb9\xc2\xaf\xc3\xa6\xc2\x95\xc2\x8b</v></root>\r\n", "XML as bytes with special double quotes": b"<root>\r\n <v>Comunica\xc3\xa7\xc3\xa3o com PABX</v>\r\n <v>\xc3\x90\xc3\x85\xc2\xb3\xc3\x87\xc3\x8d\xc2\xa8\xc3\x97\xc3\x80\xc3\x83\xc3\xa6\xc2\xb0\xc2\xb2\xc3\x88\xc2\xab\xc3\x8c\xc3\x97\xc2\xbc\xc3\xbe V2.7.02</v>\r\n <v>\xe2\x80\x9cC:\\WINDOWS\\system32\\ctfmon.exe\xe2\x80\x9d</v>\r\n</root>\r\n", "XML as bytes with surrogates": b"<root>\r\n <v>\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x87\xc2\xb1\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x83\xc2\xbb\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xad\xc2\xb9\xc2\xa4\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd__\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x87\xc2\xb1\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x8f\xc2\xb5\xc3\x8d\xc2\xb3\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x9e\xc2\xb9\xc3\xaf\xc2\xbf\xc2\xbd\xc3\x8b\xc2\xbe</v>\r\n</root>\r\n", } x = [] for test_name, test_data in TESTS.items(): s = "" s += "**********************************\n" s += " * TEST NAME: {}\n".format(test_name) print(s) cleaned = clean(test_data) if IS_PY2: etree_obj = etree.fromstring(cleaned.encode("utf-8")) else: etree_obj = etree.fromstring(cleaned) etree_bytes = etree.tostring(etree_obj, "utf-8") etree_txt = bytes_to_str(etree_bytes) s += ( " * Dirty string type {!r} ==> {!r}\n" " * Cleaned string type {!r} ==> {!r}\n" " * Etree object type {!r} ==> {!r}\n" " * Etree tostring bytes type {!r} ==> {!r}\n" " * Etree tostring str type {!r} ==> {!r}\n" "**********************************\n" ).format( type(test_data).__name__, test_data, type(cleaned).__name__, cleaned, type(etree_obj), etree_obj, type(etree_bytes).__name__, etree_bytes, type(etree_txt).__name__, etree_txt, ) print(s) x.append(s) for s in x: print(s)