Source code for test_pytan_valid_server_tests

#!/usr/bin/env python -ttB
"""
This contains valid functional tests for pytan.

These functional tests require a connection to a Tanium server in order to run.
The connection info is pulled from the SERVER_INFO dictionary in test/API_INFO.py.

These tests all use :mod:`ddt`, a package that provides for data driven tests via JSON files.
"""
from __future__ import print_function

import sys

# disable python from creating .pyc files everywhere
sys.dont_write_bytecode = True

import os
import glob
import unittest
import json  # noqa
import csv
import StringIO
import tempfile

my_file = os.path.abspath(sys.argv[0])
my_dir = os.path.dirname(my_file)
root_dir = os.path.join(my_dir, os.pardir)
root_dir = os.path.abspath(root_dir)
lib_dir = os.path.join(root_dir, 'lib')
path_adds = [my_dir, lib_dir]
[sys.path.insert(0, aa) for aa in path_adds if aa not in sys.path]

import pytan
import taniumpy
import ddt

# get our server connection info
from API_INFO import SERVER_INFO

# where the output files from the tests will be stored
TEST_OUT = os.path.join(tempfile.gettempdir(), 'TEST_OUT')


[docs]def chew_csv(c): i = StringIO.StringIO(c) r = csv.reader(i) l = list(r) return l
[docs]def spew(m, l=3): if SERVER_INFO["testlevel"] >= l: print(m, file=sys.stdout)
@ddt.ddt
[docs]class ValidServerTests(unittest.TestCase):
@classmethod def setUpClass(cls): # noqa cls.handler = pytan.Handler(**SERVER_INFO) m = "{}\n{}: PyTan v'{}' against Tanium v'{}' -- Valid Tests Starting\n".format spew(m( '*' * 100, pytan.utils.seconds_from_now(), pytan.__version__, cls.handler.session.get_server_version(), ), 2) if not hasattr(cls, 'base_type_objs'): # fetch objects for export tests kwargs = { 'name': [ "Computer Name", "IP Route Details", "IP Address", 'Folder Contents', ], 'objtype': 'sensor', } spew("TESTSETUP: Getting sensor objects for export tests of BaseType") cls.base_type_objs = cls.handler.get(**kwargs) if not hasattr(cls, 'wlus'): # create whitelisted_urls for getobject tests cls.wlus = ['test1', 'test2', 'test3'] spew("TESTSETUP: Creating whitelisted URLs for get object tests") for wlu in cls.wlus: try: cls.handler.create_whitelisted_url(url=wlu) except: pass if not hasattr(cls, 'result_set_objs'): # ask questions for export tests kwargs = { 'qtype': 'manual', 'sensors': [ "Computer Name", "IP Route Details", "IP Address", 'Folder Contents{folderPath=C:\\Program Files}', ], } spew("TESTSETUP: Asking a question for export tests of ResultSet") cls.result_set_objs = cls.handler.ask(**kwargs) spew('\n' + str(cls.handler)) @classmethod def tearDownClass(cls): # noqa m = "{}\n{}: PyTan v'{}' against Tanium v'{}' -- Valid Tests Finished".format spew(m( '*' * 100, pytan.utils.seconds_from_now(), pytan.__version__, cls.handler.session.get_server_version(), ), 2) def setup_test(self): spew("") return self.handler @ddt.file_data('ddt/ddt_valid_export_resultset.json') def test_valid_export_resultset(self, value): handler = self.setup_test() self.assertTrue(self.result_set_objs) self.assertIsInstance(self.result_set_objs['question_object'], taniumpy.Question) self.assertIsInstance(self.result_set_objs['question_results'], taniumpy.ResultSet) self.assertGreaterEqual(len(self.result_set_objs['question_results'].rows), 1) self.assertGreaterEqual(len(self.result_set_objs['question_results'].columns), 1) kwargs = {'obj': self.result_set_objs['question_results']} kwargs.update(value['args']) tests = value['tests'] s = ( "+++ TESTING EXPECTED EXPORT OBJECT ResultSet SUCCESS Handler.{}() with kwargs {}" ).format spew(s('export_obj', kwargs)) export_str = handler.export_obj(**kwargs) self.assertTrue(export_str) self.assertIsInstance(export_str, (str, unicode)) export_str_list = chew_csv(export_str) spew(export_str_list[0]) for x in tests: spew("+++ EVAL TEST: %s" % x) self.assertTrue(eval(x)) @ddt.file_data('ddt/ddt_valid_export_basetype.json') def test_valid_export_basetype(self, value): handler = self.setup_test() self.assertTrue(self.base_type_objs) self.assertIsInstance(self.base_type_objs, taniumpy.BaseType) self.assertEquals(len(self.base_type_objs), 4) kwargs = {'obj': self.base_type_objs} kwargs.update(value['args']) tests = value['tests'] s = ( "+++ TESTING EXPECTED EXPORT OBJECT BaseType SUCCESS Handler.{}() with kwargs {}" ).format spew(s('export_obj', kwargs)) export_str = handler.export_obj(**kwargs) self.assertTrue(export_str) self.assertIsInstance(export_str, (str, unicode)) for x in tests: spew("+++ EVAL TEST: %s" % x) self.assertTrue(eval(x)) @ddt.file_data('ddt/ddt_valid_deploy_action.json') def test_valid_deploy_action(self, value): handler = self.setup_test() method = value['method'] args = value['args'] s = "+++ TESTING EXPECTED DEPLOY ACTION SUCCESS Handler.{}() with kwargs {}".format spew(s(method, args)) ret = getattr(handler, method)(**args) self.assertIsInstance(ret['action_object'], taniumpy.Action) self.assertIsInstance(ret['saved_action_object'], (taniumpy.SavedAction, type(None))) self.assertIsInstance(ret['package_object'], taniumpy.PackageSpec) self.assertIsInstance(ret['group_object'], (taniumpy.Group, type(None))) self.assertIsInstance(ret['action_info'], taniumpy.object_types.result_info.ResultInfo) self.assertIsInstance(ret['poller_object'], pytan.pollers.ActionPoller) get_results = args.get('get_results', True) if get_results: self.assertIsInstance(ret['action_results'], taniumpy.object_types.result_set.ResultSet) self.assertGreaterEqual(len(ret['action_results'].rows), 1) self.assertGreaterEqual(len(ret['action_results'].columns), 1) self.assertTrue(ret['action_result_map']) self.assertIsNotNone(ret['poller_success']) for ft in pytan.constants.EXPORT_MAPS['ResultSet'].keys(): report_file, result = handler.export_to_report_file( obj=ret['action_results'], export_format=ft, report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) self.assertTrue(report_file) self.assertTrue(result) self.assertTrue(os.path.isfile(report_file)) self.assertGreaterEqual(len(result), 10) @ddt.file_data('ddt/ddt_valid_create_object.json') def test_valid_create_object(self, value): handler = self.setup_test() os.chdir(my_dir) method = value['method'] args = value['args'] t_obj = eval(value['taniumpyobj']) delete_args = {str(k): str(v) for k, v in value['delete'].iteritems()} s = ( "+++ TESTING EXPECTED CREATE OBJECT SUCCESS Handler.{}() with kwargs {}" ).format spew(s(method, args)) try: handler.delete(**delete_args) except Exception as e: spew(e) ret = getattr(handler, method)(**args) self.assertIsInstance(ret, t_obj) delete_obj = handler.delete(**delete_args) for x in delete_obj: self.assertIsInstance(x, t_obj) @ddt.file_data('ddt/ddt_valid_create_object_from_json.json') def test_valid_create_object_from_json(self, value): handler = self.setup_test() orig_objs = handler.get(**value['get']) if value['transform_attr']: for x in orig_objs: transform_val = getattr(x, value['transform_attr']) transform_val += value['transform_value'] setattr(x, value['transform_attr'], transform_val) del_kwargs = {} del_kwargs['objtype'] = value['objtype'] del_kwargs[value['transform_attr']] = transform_val if value['delete_before']: try: handler.delete(**del_kwargs) except Exception as e: spew(e) json_file, results = handler.export_to_report_file( obj=orig_objs, export_format='json', report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) kwargs = {'objtype': value['objtype'], 'json_file': json_file} s = ( "+++ TESTING EXPECTED CREATE OBJECT FROM JSON SUCCESS Handler.{}() with kwargs {}" ).format spew(s('create_from_json', kwargs)) new_obj = handler.create_from_json(**kwargs) self.assertIsInstance(new_obj, eval(value['listobj'])) for x in new_obj: self.assertIsInstance(x, eval(value['singleobj'])) @ddt.file_data('ddt/ddt_valid_questions.json') def test_valid_question(self, value): handler = self.setup_test() method = value['method'] args = value['args'] parsed_q = 'parsed' in args.get('qtype', '') is6_5 = handler.session.platform_is_6_5() if not is6_5 and parsed_q: exc = eval('pytan.exceptions.UnsupportedVersionError') e = '.*not supported in version.*' s = ( "+++ TESTING EXPECTED QUESTION FAILURE Handler.{}() with kwargs {}" ).format spew(s(method, args)) with self.assertRaisesRegexp(exc, e): getattr(handler, method)(**args) else: s = ( "+++ TESTING EXPECTED QUESTION SUCCESS Handler.{}() with kwargs {}" ).format spew(s(method, args)) ret = getattr(handler, method)(**args) self.assertIsInstance(ret['question_object'], taniumpy.Question) self.assertIsInstance(ret['poller_object'], pytan.pollers.QuestionPoller) parsed_q = args['qtype'] == 'parsed' if parsed_q: self.assertIsInstance(ret['parse_results'], taniumpy.ParseResultGroupList) get_results = args.get('get_results', True) if get_results: self.assertIsNotNone(ret['poller_success']) self.assertIsInstance(ret['question_results'], taniumpy.ResultSet) self.assertGreaterEqual(len(ret['question_results'].rows), 1) self.assertGreaterEqual(len(ret['question_results'].columns), 1) for ft in pytan.constants.EXPORT_MAPS['ResultSet'].keys(): report_file, result = handler.export_to_report_file( obj=ret['question_results'], export_format=ft, report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) self.assertTrue(report_file) self.assertTrue(result) self.assertTrue(os.path.isfile(report_file)) self.assertGreaterEqual(len(result), 10) @ddt.file_data('ddt/ddt_valid_saved_questions.json') def test_valid_saved_question(self, value): handler = self.setup_test() method = value['method'] args = value['args'] s = ( "+++ TESTING EXPECTED SAVED QUESTION SUCCESS Handler.{}() with kwargs {}" ).format spew(s(method, args)) ret = getattr(handler, method)(**args) self.assertIsInstance(ret['saved_question_object'], taniumpy.SavedQuestion) self.assertIsInstance(ret['question_object'], taniumpy.Question) self.assertIsInstance(ret['poller_object'], (pytan.pollers.QuestionPoller, type(None))) self.assertIsInstance(ret['poller_success'], (type(None), type(True))) self.assertIsInstance(ret['question_results'], taniumpy.ResultSet) self.assertGreaterEqual(len(ret['question_results'].rows), 1) self.assertGreaterEqual(len(ret['question_results'].columns), 1) for ft in pytan.constants.EXPORT_MAPS['ResultSet'].keys(): report_file, result = handler.export_to_report_file( obj=ret['question_results'], export_format=ft, report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) self.assertTrue(report_file) self.assertTrue(result) self.assertTrue(os.path.isfile(report_file)) self.assertGreaterEqual(len(result), 10) @ddt.file_data('ddt/ddt_valid_get_object.json') def test_valid_get_object(self, value): handler = self.setup_test() method = value['method'] args = value['args'] tests = value['tests'] s = ( "+++ TESTING EXPECTED GET SUCCESS Handler.{}() with kwargs {}" ).format spew(s(method, args)) response = getattr(handler, method)(**args) self.assertTrue(response) self.assertIsInstance(response, taniumpy.BaseType) for x in tests: spew("+++ EVAL TEST: %s" % x) self.assertTrue(eval(x)) for ft in pytan.constants.EXPORT_MAPS['BaseType'].keys(): report_file, result = handler.export_to_report_file( obj=response, export_format=ft, report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) self.assertTrue(report_file) self.assertTrue(result) self.assertTrue(os.path.isfile(report_file)) self.assertGreaterEqual(len(result), 10) @ddt.file_data('ddt/ddt_invalid_export_basetype.json') def test_invalid_export_basetype(self, value): handler = self.setup_test() kwargs = {'obj': self.base_type_objs} kwargs.update(value['args']) exc = eval(value['exception']) e = value['error_str'] s = ( "+++ TESTING EXPECTED EXPORT OBJECT BaseType FAILURE Handler.{}() with kwargs {}" ).format spew(s('export_obj', kwargs)) with self.assertRaisesRegexp(exc, e): handler.export_obj(**kwargs) @ddt.file_data('ddt/ddt_invalid_create_object_from_json.json') def test_invalid_create_object_from_json(self, value): handler = self.setup_test() orig_objs = handler.get(**value['get']) json_file, results = handler.export_to_report_file( obj=orig_objs, export_format='json', report_dir=TEST_OUT, prefix=sys._getframe().f_code.co_name + '_', ) kwargs = {'objtype': value['objtype'], 'json_file': json_file} exc = eval(value['exception']) e = value['error_str'] s = ( "+++ TESTING EXPECTED CREATE OBJECT FROM JSON FAILURE Handler.{}() with kwargs {}" ).format spew(s('create_from_json', kwargs)) with self.assertRaisesRegexp(exc, e): handler.create_from_json(**kwargs) @ddt.file_data('ddt/ddt_invalid_questions.json') def test_invalid_question(self, value): handler = self.setup_test() method = value['method'] args = value['args'] exc = eval(value['exception']) e = value['error_str'] parsed_q = 'parsed' in args.get('qtype', '') is6_5 = handler.session.platform_is_6_5() if not is6_5 and parsed_q: exc = eval('pytan.exceptions.UnsupportedVersionError') e = '.*not supported in version.*' s = ( "+++ TESTING EXPECTED QUESTION FAILURE Handler.{}() with kwargs {}" ).format spew(s(method, args)) with self.assertRaisesRegexp(exc, e): getattr(handler, method)(**args) @ddt.file_data('ddt/ddt_invalid_get_object.json') def test_invalid_get_object(self, value): handler = self.setup_test() method = value['method'] args = value['args'] exc = eval(value['exception']) e = value['error_str'] s = ( "+++ TESTING EXPECTED GET FAILURE Handler.{}() with kwargs {}" ).format spew(s(method, args)) with self.assertRaisesRegexp(exc, e): getattr(handler, method)(**args) @ddt.file_data('ddt/ddt_invalid_create_object.json') def test_invalid_create_object(self, value): handler = self.setup_test() method = value['method'] args = value['args'] exc = eval(value['exception']) e = value['error_str'] s = ( "+++ TESTING EXPECTED CREATE OBJECT FAILURE Handler.{}() with kwargs {}" ).format spew(s(method, args)) with self.assertRaisesRegexp(exc, e): getattr(handler, method)(**args) @ddt.file_data('ddt/ddt_invalid_deploy_action.json') def test_invalid_deploy_action(self, value): handler = self.setup_test() method = value['method'] args = value['args'] exc = eval(value['exception']) e = value['error_str'] args["report_dir"] = args.get('report_dir', tempfile.gettempdir()) s = ( "+++ TESTING EXPECTED DEPLOY ACTION FAILURE Handler.{}() with kwargs {}" ).format spew(s(method, args)) with self.assertRaisesRegexp(exc, e): getattr(handler, method)(**args) @ddt.file_data('ddt/ddt_invalid_export_resultset.json') def test_invalid_export_resultset(self, value): handler = self.setup_test() kwargs = {'obj': self.result_set_objs['question_results']} kwargs.update(value['args']) exc = eval(value['exception']) e = value['error_str'] s = ( "+++ TESTING EXPECTED EXPORT OBJECT ResultSet FAILURE Handler.{}() with kwargs {}" ).format spew(s('export_obj', kwargs)) with self.assertRaisesRegexp(exc, e): handler.export_obj(**kwargs) if __name__ == "__main__": if not os.path.isdir(TEST_OUT): os.mkdir(TEST_OUT) test_files = glob.glob(TEST_OUT + '/*.*') if test_files: spew("Cleaning up %s old test files" % len(test_files)) [os.unlink(x) for x in test_files] unittest.main( verbosity=SERVER_INFO["testlevel"], failfast=SERVER_INFO["FAILFAST"], catchbreak=SERVER_INFO["CATCHBREAK"], buffer=SERVER_INFO["BUFFER"], )