Source code for tanium_kit.zipper

from __future__ import absolute_import, division, print_function, unicode_literals

import datetime
import logging
import os
import re
import zipfile

LOG = logging.getLogger(__name__.split(".")[-1])

DEFAULT_SKIPS = [
    r"{zip_file}",
    r"\.DS_Store",
    r"\.com\.apple.*",
]

ZIP_ADD = "Zipping {} '{}' as '{}'".format
ZIP_SKIP = "Skipping {} '{}', matches pattern '{}'".format
ZIP_MAKE = "{} ZIP '{}' of source path '{}'".format
ZIP_PATTERNS = "Skip patterns: {}".format


[docs]def skip_check(p, t, skips): for s in skips: if re.findall(s, p): LOG.info(ZIP_SKIP(t, p, s)) return True return False
[docs]def zip_walk(zf, src_path, **kwargs): dest_path = kwargs.get("dest_path", "") skips = kwargs.get("skips", []) compress_type = kwargs.get("compress_type", 0) basename = os.path.dirname(src_path) for dirname, subdirs, files in os.walk(src_path): if dest_path: dest_dir = dirname.replace(src_path, "") dest_dir = dest_dir.lstrip(os.path.sep) dest_dir = os.path.join(dest_path, dest_dir) else: dest_dir = dirname.replace(basename, "") dest_dir = dest_dir.lstrip(os.path.sep) if skip_check(dirname, "directory", skips): continue LOG.info(ZIP_ADD("directory", dirname, dest_dir)) zf.write(filename=dirname, arcname=dest_dir, compress_type=compress_type) for filename in files: src_file = os.path.join(dirname, filename) dest_file = os.path.join(dest_dir, filename) if skip_check(src_file, "file", skips): continue LOG.debug(ZIP_ADD("file", src_file, dest_file)) zf.write(filename=src_file, arcname=dest_file, compress_type=compress_type) return zf
[docs]def mkzip(zip_path, src_path, **kwargs): dest_path = kwargs.get("dest_path", "") skips = kwargs.get("skips", []) compress = kwargs.get("compress", True) remove_old = kwargs.get("remove_old", True) zip_path = os.path.abspath(zip_path) if compress: try: import zlib # noqa compress_type = zipfile.ZIP_DEFLATED except: compress_type = zipfile.ZIP_STORED else: compress_type = zipfile.ZIP_STORED ret = { "zip_path": zip_path, "zip_file": os.path.basename(zip_path), "src_path": src_path, "dest_path": dest_path, } all_skips = skips + [x.format(**ret) for x in DEFAULT_SKIPS] all_skips = [x for x in all_skips if x] skip_text = ", ".join(["'{}'".format(x) for x in all_skips]) if remove_old and os.path.isfile(zip_path): m = "REMOVING OLD ZIP '{}'" m = m.format(zip_path) LOG.warning(m) LOG.info(ZIP_MAKE("Creating", zip_path, src_path)) LOG.info(ZIP_PATTERNS(skip_text)) with zipfile.ZipFile(zip_path, "w") as zf: zip_walk( zf=zf, src_path=src_path, dest_path=dest_path, skips=all_skips, compress_type=compress_type, ) LOG.info(ZIP_MAKE("Created", zip_path, src_path)) return ret
[docs]def zip_info(zip_path, details=False): zf = zipfile.ZipFile(zip_path) for info in zf.infolist(): if details: LOG.info('Archived filename: {}'.format(info.filename)) LOG.info('\tComment: {}'.format(info.comment)) LOG.info('\tModified: {}'.format(datetime.datetime(*info.date_time))) LOG.info('\tSystem: {} (0 = Windows, 3 = Unix)'.format(info.create_system)) LOG.info('\tZIP version: {}'.format(info.create_version)) LOG.info('\tCompressed: {} bytes'.format(info.compress_size)) LOG.info('\tUncompressed: {} bytes'.format(info.file_size)) else: LOG.info('Archived filename: {}'.format(info.filename)) zf.close()