#!/usr/bin/env python
# -*- mode: Python; tab-width: 4; indent-tabs-mode: nil; -*-
# ex: set tabstop=4
# Please do not change the two lines above. See PEP 8, PEP 263.
"""Collection of classes and methods for polling of actions/questions in :mod:`pytan`"""
import sys
# disable python from creating .pyc files everywhere
sys.dont_write_bytecode = True
import os
import logging
import time
import pprint
from datetime import datetime
from datetime import timedelta
my_file = os.path.abspath(__file__)
my_dir = os.path.dirname(my_file)
parent_dir = os.path.dirname(my_dir)
path_adds = [parent_dir]
[sys.path.insert(0, aa) for aa in path_adds if aa not in sys.path]
import taniumpy
import pytan
[docs]class QuestionPoller(object):
"""A class to poll the progress of a Question.
The primary function of this class is to poll for result info for a question, and fire off events:
* ProgressChanged
* AnswersChanged
* AnswersComplete
Parameters
----------
handler : :class:`pytan.handler.Handler`
* PyTan handler to use for GetResultInfo calls
obj : :class:`taniumpy.object_types.question.Question`
* object to poll for progress
polling_secs : int, optional
* default: 5
* Number of seconds to wait in between GetResultInfo loops
complete_pct : int/float, optional
* default: 99
* Percentage of mr_tested out of estimated_total to consider the question "done"
override_timeout_secs : int, optional
* default: 0
* If supplied and not 0, timeout in seconds instead of when object expires
override_estimated_total : int, optional
* instead of getting number of systems that should see this question from result_info.estimated_total, use this number
force_passed_done_count : int, optional
* when this number of systems have passed the right hand side of the question, consider the question complete
"""
OBJECT_TYPE = taniumpy.object_types.question.Question
"""valid type of object that can be passed in as obj to __init__"""
STR_ATTRS = [
'object_info',
'polling_secs',
'override_timeout_secs',
'complete_pct',
'expiration',
]
"""Class attributes to include in __str__ output"""
COMPLETE_PCT_DEFAULT = 99
"""default value for self.complete_pct"""
POLLING_SECS_DEFAULT = 5
"""default value for self.polling_secs"""
OVERRIDE_TIMEOUT_SECS_DEFAULT = 0
"""default value for self.override_timeout_secs"""
EXPIRATION_ATTR = 'expiration'
"""attribute of self.obj that contains the expiration for this object"""
EXPIRY_FALLBACK_SECS = 600
"""If the EXPIRATION_ATTR of `obj` can't be automatically determined, then this is used as a fallback for timeout - polling will failed after this many seconds if completion not reached"""
obj = None
"""The object for this poller"""
handler = None
"""The Handler object for this poller"""
result_info = None
"""This will be updated with the ResultInfo object during run() calls"""
_stop = False
"""Controls whether a run() loop should stop or not"""
def __init__(self, handler, obj, **kwargs):
self.methodlog = logging.getLogger("method_debug")
self.DEBUG_METHOD_LOCALS = kwargs.get('debug_method_locals', False)
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.setup_logging()
if not isinstance(handler, pytan.handler.Handler):
m = "{} is not a valid handler instance! Must be a: {!r}".format
raise pytan.exceptions.PollingError(m(type(handler), pytan.handler.Handler))
if not isinstance(obj, self.OBJECT_TYPE):
m = "{} is not a valid object type! Must be a: {}".format
raise pytan.exceptions.PollingError(m(type(obj), self.OBJECT_TYPE))
self.handler = handler
self.obj = obj
self.polling_secs = kwargs.get('polling_secs', self.POLLING_SECS_DEFAULT)
self.complete_pct = kwargs.get('complete_pct', self.COMPLETE_PCT_DEFAULT)
self.override_timeout_secs = kwargs.get(
'override_timeout_secs', self.OVERRIDE_TIMEOUT_SECS_DEFAULT,
)
self.force_passed_done_count = kwargs.get('force_passed_done_count', 0)
self.id_str = "ID {}: ".format(getattr(self.obj, 'id', '-1'))
self.obj_id = self._derive_attribute(attr='id', fallback=None)
self.id_str = "ID {}: ".format(self.obj_id)
self.poller_result = None
self._post_init(**kwargs)
[docs] def setup_logging(self):
"""Setup loggers for this object"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.qualname = "pytan.pollers.{}".format(self.__class__.__name__)
self.mylog = logging.getLogger(self.qualname)
self.progresslog = logging.getLogger(self.qualname + ".progress")
self.resolverlog = logging.getLogger(self.qualname + ".resolver")
def __str__(self):
self._debug_locals(sys._getframe().f_code.co_name, locals())
class_name = self.__class__.__name__
attrs = ", ".join(['{0}: "{1}"'.format(x, getattr(self, x, None)) for x in self.STR_ATTRS])
ret = "{} {}".format(class_name, attrs)
return ret
[docs] def _post_init(self, **kwargs):
"""Post init class setup"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.override_estimated_total = kwargs.get('override_estimated_total', 0)
self._derive_expiration(**kwargs)
self._derive_object_info(**kwargs)
[docs] def _refetch_obj(self, **kwargs):
"""Utility method to re-fetch a object
This is used in the case that the obj supplied does not have all the metadata
available
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['obj']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
obj = self.handler._find(obj=self.obj, **clean_kwargs)
if pytan.utils.empty_obj(obj):
m = "Unable to find object: {}".format
raise pytan.exceptions.PollingError(m(self.obj))
self.obj = obj
[docs] def _derive_attribute(self, attr, fallback='', **kwargs):
"""Derive an attributes value from self.obj
Will re-fetch self.obj if the attribute is not set
Parameters
----------
attr : string
string of attribute name to fetch from self.obj
fallback : string
value to fallback to if it still can't be accessed after re-fetching the obj
if fallback is None, an exception will be raised
Returns
-------
val : perspective
The value of the attr from self.obj
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['obj', 'pytan_help']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
val = getattr(self.obj, attr, None)
# if attr isn't available on the object, maybe it's only a partial object
# let's use the handler to re-fetch it
if val is None:
m = "{}attribute {!r} is not set, issuing GetObject to get the full object".format
m = m(self.id_str, attr)
self.resolverlog.debug(m)
self._refetch_obj(pytan_help=m, **clean_kwargs)
val = getattr(self.obj, attr, '')
if val is None:
if fallback is None:
m = "{}{!r} is None, even after re-fetching object".format
raise pytan.exceptions.PollingError(m(self.id_str, attr))
m = (
"{}attribute {!r} is not set after re-fetching object - using fallback of {}"
).format
self.resolverlog.debug(m(self.id_str, attr, fallback))
val = fallback
m = "{}attribute '{}' resolved to '{}'".format
self.mylog.debug(m(self.id_str, attr, val))
return val
[docs] def _derive_object_info(self, **kwargs):
"""Derive self.object_info from self.obj"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = 'query_text'
fb = 'Unable to fetch question text'
question_text = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
attr_name = 'id'
fb = -1
question_id = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
object_info = "Question ID: {}, Query: {}".format(question_id, question_text)
m = "{}'object_info' resolved to '{}'".format
self.resolverlog.debug(m(self.id_str, object_info))
self.object_info = object_info
[docs] def _derive_expiration(self, **kwargs):
"""Derive the expiration datetime string from a object
Will generate a datetime string from self.EXPIRY_FALLBACK_SECS if unable to get the expiration from the object (self.obj) itself.
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = self.EXPIRATION_ATTR
fb = pytan.utils.seconds_from_now(secs=self.EXPIRY_FALLBACK_SECS)
self.expiration = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
[docs] def run_callback(self, callbacks, callback, pct, **kwargs):
"""Utility method to find a callback in callbacks dict and run it
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
if not callbacks.get(callback, ''):
return
cb_clean_keys = ['poller', 'pct']
cb_clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=cb_clean_keys)
try:
m = "Running callback: {}".format
self.mylog.debug(m(callback))
callbacks[callback](poller=self, pct=pct, **cb_clean_kwargs)
except Exception as e:
m = "Exception occurred in '{}' Callback: {}".format
self.mylog.warning(m(callback, e))
[docs] def set_complect_pct(self, val): # noqa
"""Set the complete_pct to a new value
Parameters
----------
val : int/float
float value representing the new percentage to consider self.obj complete
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.complete_pct = val
[docs] def get_result_info(self, **kwargs):
"""Simple utility wrapper around :func:`pytan.handler.Handler.get_result_info`
Parameters
----------
gri_retry_count : int, optional
* default: 10
* Number of times to re-try GetResultInfo when estimated_total comes back as 0
Returns
-------
result_info : :class:`taniumpy.object_types.result_info.ResultInfo`
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
# add a retry to re-fetch result info if estimated_total == 0
gri_retry_count = kwargs.get('gri_retry_count', 10)
clean_keys = ['obj', 'gri_retry_count']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
current_try = 1
while True:
result_info = self.handler.get_result_info(obj=self.obj, **clean_kwargs)
if result_info.estimated_total != 0:
break
attempt_text = "attempt {} out of {}".format(current_try, gri_retry_count)
if current_try >= gri_retry_count:
m = "Estimated Total of Clients is 0 -- no clients available?, {}".format
raise pytan.exceptions.PollingError(m(attempt_text))
else:
current_try += 1
h = "Re-issuing a GetResultInfo since the estimated_total came back 0, {}".format
clean_kwargs['pytan_help'] = h(attempt_text)
self.mylog.debug(h(attempt_text))
time.sleep(1)
continue
return result_info
[docs] def get_result_data(self, **kwargs):
"""Simple utility wrapper around :func:`pytan.handler.Handler.get_result_data`
Returns
-------
result_data : :class:`taniumpy.object_types.result_set.ResultSet`
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['obj']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
result_data = self.handler.get_result_data(obj=self.obj, **clean_kwargs)
return result_data
[docs] def run(self, callbacks={}, **kwargs):
"""Poll for question data and issue callbacks.
Parameters
----------
callbacks : dict
* Callbacks should be a dict with any of these members:
* 'ProgressChanged'
* 'AnswersChanged'
* 'AnswersComplete'
* Each callback should be a function that accepts:
* 'poller': a poller instance
* 'pct': a percent complete
* 'kwargs': a dict of other args
gri_retry_count : int, optional
* default: 10
* Number of times to re-try GetResultInfo when estimated_total comes back as 0
Notes
-----
* Any callback can choose to get data from the session by calling poller.get_result_data() or new info by calling poller.get_result_info()
* Any callback can choose to stop the poller by calling poller.stop()
* Polling will be stopped only when one of the callbacks calls the stop() method or the answers are complete.
* Any callback can call setPercentCompleteThreshold to change what "done" means on the fly
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.start = datetime.utcnow()
self.expiration_timeout = pytan.utils.timestr_to_datetime(timestr=self.expiration)
if self.override_timeout_secs:
td_obj = timedelta(seconds=self.override_timeout_secs)
self.override_timeout = self.start + td_obj
else:
self.override_timeout = None
clean_keys = ['callbacks']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
self.passed_eq_total = self.passed_eq_est_total_loop(callbacks=callbacks, **clean_kwargs)
self.poller_result = all([self.passed_eq_total])
return self.poller_result
[docs] def passed_eq_est_total_loop(self, callbacks={}, **kwargs):
"""Method to poll Result Info for self.obj until the percentage of 'passed' out of 'estimated_total' is greater than or equal to self.complete_pct
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
# current percentage tracker
self.pct = None
# loop counter
self.loop_count = 1
# establish a previous result_info that's empty
self.previous_result_info = taniumpy.object_types.result_info.ResultInfo()
while not self._stop:
# perform a GetResultInfo SOAP call
clean_keys = ['pytan_help', 'callback', 'pct']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
h = "Issue a GetResultInfo for a Question to check the current progress of answers"
self.result_info = self.get_result_info(pytan_help=h, **clean_kwargs)
# derive the current percentage of completion by calculating percentage of
# mr_tested out of estimated_total
# mr_tested = number of systems that have seen the question
# estimated_total = rough estimate of total number of systems
# passed = number of systems that have passed any filters for the question
tested = self.result_info.mr_tested
est_total = self.override_estimated_total or self.result_info.estimated_total
passed = self.result_info.passed
new_pct = pytan.utils.get_percentage(part=tested, whole=est_total)
new_pct_str = "{0:.0f}%".format(new_pct)
complete_pct_str = "{0:.0f}%".format(self.complete_pct)
# print a progress debug string
self.progress_str = (
"Progress: Tested: {0.tested}, Passed: {0.passed}, "
"MR Tested: {0.mr_tested}, MR Passed: {0.mr_passed}, "
"Est Total: {0.estimated_total}, Row Count: {0.row_count}, Override Est Total: {1}"
).format(self.result_info, self.override_estimated_total)
self.progresslog.debug("{}{}".format(self.id_str, self.progress_str))
# print a timing debug string
if self.override_timeout:
time_till_expiry = self.override_timeout - datetime.utcnow()
else:
time_till_expiry = self.expiration_timeout - datetime.utcnow()
self.timing_str = (
"Timing: Started: {}, Expiration: {}, Override Timeout: {}, "
"Elapsed Time: {}, Left till expiry: {}, Loop Count: {}"
).format(
self.start,
self.expiration_timeout,
self.override_timeout,
datetime.utcnow() - self.start,
time_till_expiry,
self.loop_count,
)
self.progresslog.debug("{}{}".format(self.id_str, self.timing_str))
# check to see if progress has changed, if so run the callback
progress_changed = any([
self.previous_result_info.tested != self.result_info.tested,
self.previous_result_info.passed != self.result_info.passed,
self.previous_result_info.mr_tested != self.result_info.mr_tested,
self.previous_result_info.mr_passed != self.result_info.mr_passed,
self.previous_result_info.estimated_total != self.result_info.estimated_total,
self.pct != new_pct,
])
if progress_changed:
m = "{}Progress Changed {} ({} of {})".format
self.progresslog.info(m(self.id_str, new_pct_str, tested, est_total))
cb = 'ProgressChanged'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
# check to see if answers have changed, if so run the callback
answers_changed = any([
self.previous_result_info.tested != self.result_info.tested,
self.previous_result_info.passed != self.result_info.passed,
])
if answers_changed:
cb = 'AnswersChanged'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
# check to see if new_pct has reached complete_pct threshold, if so return True
if new_pct >= self.complete_pct:
m = "{}Reached Threshold of {} ({} of {})".format
self.mylog.info(m(self.id_str, complete_pct_str, tested, est_total))
cb = 'AnswersComplete'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
return True
if self.force_passed_done_count and passed >= self.force_passed_done_count:
m = "{}Reached forced passed done count of {} ({} of {})".format
self.mylog.info(m(self.id_str, self.force_passed_done_count, tested, est_total))
cb = 'AnswersComplete'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
return True
# check to see if override timeout is specified, if so and we have passed it, return
# False
if self.override_timeout and datetime.utcnow() >= self.override_timeout:
m = "{}Reached override timeout of {}".format
self.mylog.warning(m(self.id_str, self.override_timeout))
return False
# check to see if we have passed the actions expiration timeout, if so return False
if datetime.utcnow() >= self.expiration_timeout:
m = "{}Reached expiration timeout of {}".format
self.mylog.warning(m(self.id_str, self.expiration_timeout))
return False
# if stop is called, return True
if self._stop:
m = "{}Stop called at {}".format
self.mylog.info(m(self.id_str, new_pct_str))
return False
# update our class variables to the new values determined by this loop
self.pct = new_pct
self.previous_result_info = self.result_info
time.sleep(self.polling_secs)
self.loop_count += 1
[docs] def stop(self):
self._stop = True
[docs] def _debug_locals(self, fname, flocals):
"""Method to print out locals for a function if self.DEBUG_METHOD_LOCALS is True"""
if getattr(self, 'DEBUG_METHOD_LOCALS', False):
m = "Local variables for {}.{}:\n{}".format
self.methodlog.debug(m(self.__class__.__name__, fname, pprint.pformat(flocals)))
[docs]class ActionPoller(QuestionPoller):
"""A class to poll the progress of an Action.
The primary function of this class is to poll for result info for an action, and fire off events:
* 'SeenProgressChanged'
* 'SeenAnswersComplete'
* 'FinishedProgressChanged'
* 'FinishedAnswersComplete'
Parameters
----------
handler : :class:`pytan.handler.Handler`
* PyTan handler to use for GetResultInfo calls
obj : :class:`taniumpy.object_types.action.Action`
* object to poll for progress
polling_secs : int, optional
* default: 5
* Number of seconds to wait in between GetResultInfo loops
complete_pct : int/float, optional
* default: 100
* Percentage of passed_count out of successfully run actions to consider the action "done"
override_timeout_secs : int, optional
* default: 0
* If supplied and not 0, timeout in seconds instead of when object expires
override_passed_count : int, optional
* instead of getting number of systems that should run this action by asking a question, use this number
"""
OBJECT_TYPE = taniumpy.object_types.action.Action
"""valid type of object that can be passed in as obj to __init__"""
COMPLETE_PCT_DEFAULT = 100
"""default value for self.complete_pct"""
ACTION_DONE_KEY = 'success'
"""key in action_result_map that maps to an action being done"""
RUNNING_STATUSES = ["active", "open"]
"""values for status attribute of action object that mean the action is running"""
EXPIRATION_ATTR = 'expiration_time'
"""attribute of self.obj that contains the expiration for this object"""
[docs] def _post_init(self, **kwargs):
"""Post init class setup"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.override_passed_count = kwargs.get('override_passed_count', 0)
self._derive_package_spec(**kwargs)
self._derive_target_group(**kwargs)
self._derive_verify_enabled(**kwargs)
self._derive_result_map(**kwargs)
self._derive_expiration(**kwargs)
self._derive_status(**kwargs)
self._derive_stopped_flag(**kwargs)
self._derive_object_info(**kwargs)
[docs] def _derive_status(self, **kwargs):
"""Get the status attribute for self.obj"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = 'status'
fb = None
self.status = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
[docs] def _derive_stopped_flag(self, **kwargs):
"""Get the stopped_flag attribute for self.obj"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = 'stopped_flag'
fb = None
self.stopped_flag = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
self.stopped_flag = int(self.stopped_flag)
self.stopped_flag = bool(self.stopped_flag)
[docs] def _derive_package_spec(self, **kwargs):
"""Get the package_spec attribute for self.obj, then fetch the full package_spec object"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback', 'obj']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = 'package_spec'
fb = None
self.package_spec = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
# get the full package object associated with this action
h = "Issue a GetObject on the package for an action to get the full object"
clean_kwargs['pytan_help'] = h
self.package_spec = self.handler._find(obj=self.package_spec, **clean_kwargs)
[docs] def _derive_target_group(self, **kwargs):
"""Get the target_group attribute for self.obj, then fetch the full group object"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['attr', 'fallback', 'obj']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
attr_name = 'target_group'
fb = None
self.target_group = self._derive_attribute(attr=attr_name, fallback=fb, **clean_kwargs)
# if the target group id is not 0, re-fetch the full group object
if int(self.target_group.id) != 0:
h = (
"Issue a GetObject on the target_group for an action to get the full Group "
"object"
)
clean_kwargs['pytan_help'] = h
try:
self.target_group = self.handler._find(obj=self.target_group, **clean_kwargs)
self._fix_group(g=self.target_group)
self.passed_count_reliable = True
except:
self.passed_count_reliable = False
m = "{}Passed Count unreliable! Unable to find Actions Target Group: {}".format
self.mylog.exception(m(self.id_str, self.target_group))
[docs] def _fix_group(self, g, **kwargs):
"""Sets ID to null on a group object and all of it's sub_groups, needed for 6.5"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
g.id = None
if g.sub_groups:
for x in g.sub_groups:
self._fix_group(g=x)
[docs] def _derive_verify_enabled(self, **kwargs):
"""Determine if this action has verification enabled"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.verify_enabled = False
package_spec = getattr(self, 'package_spec', None)
ps_verify_group_id = getattr(package_spec, 'verify_group_id', None)
vg = getattr(package_spec, 'verify_group', None)
vg_id = getattr(vg, 'id', None)
if ps_verify_group_id or vg_id:
self.verify_enabled = True
[docs] def _derive_result_map(self, **kwargs):
"""Determine what self.result_map should contain for the various statuses an action can have
A package object has to have a verify_group defined on it in order
for deploy action verification to trigger. That can be only done
at package creation/update
If verify_enable is True, then the various result states for an action change
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
if self.verify_enabled:
finished = [
'Verified.', 'Succeeded.', 'Expired.', 'Stopped.', 'NotSucceeded.', 'Failed.',
]
success = [
'Verified.',
]
running = [
'Completed.', 'PendingVerification.', 'Copying.', 'Waiting.', 'Downloading.',
'Running.',
]
failed = [
'Expired.', 'Stopped.', 'NotSucceeded.', 'Failed.',
]
else:
finished = [
'Verified.', 'Succeeded.', 'Completed.', 'Expired.', 'Stopped.', 'NotSucceeded.',
'Failed.',
]
success = [
'Verified.', 'Completed.',
]
running = [
'PendingVerification.', 'Copying.', 'Waiting.', 'Downloading.', 'Running.',
]
failed = [
'Expired.', 'Stopped.', 'NotSucceeded.', 'Failed.',
]
self.result_map = {
'finished': {"{}:{}".format(self.obj.id, k): [] for k in finished},
'success': {"{}:{}".format(self.obj.id, k): [] for k in success},
'running': {"{}:{}".format(self.obj.id, k): [] for k in running},
'failed': {"{}:{}".format(self.obj.id, k): [] for k in failed},
'unknown': {},
}
for k, v in self.result_map.iteritems():
v['total'] = 0
m = "{}Result Map resolved to {}".format
self.resolverlog.debug(m(self.id_str, self.result_map))
[docs] def _derive_object_info(self, **kwargs):
"""Derive self.object_info from self.obj"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
m = "{}Package: '{}', Target: '{}', Verify: {}, Stopped: {}, Status: {}".format
object_info = m(
self.id_str, self.package_spec.name, self.target_group.text, self.verify_enabled,
self.stopped_flag, self.status,
)
m = "{}Object Info resolved to {}".format
self.resolverlog.debug(m(self.id_str, object_info))
self.object_info = object_info
[docs] def run(self, callbacks={}, **kwargs):
"""Poll for action data and issue callbacks.
Parameters
----------
callbacks : dict
* Callbacks should be a dict with any of these members:
* 'SeenProgressChanged'
* 'SeenAnswersComplete'
* 'FinishedProgressChanged'
* 'FinishedAnswersComplete'
* Each callback should be a function that accepts:
* 'poller': a poller instance
* 'pct': a percent complete
* 'kwargs': a dict of other args
Notes
-----
* Any callback can choose to get data from the session by calling :func:`pytan.poller.QuestionPoller.get_result_data` or new info by calling :func:`pytan.poller.QuestionPoller.get_result_info`
* Any callback can choose to stop the poller by calling :func:`pytan.poller.QuestionPoller.stop`
* Polling will be stopped only when one of the callbacks calls the :func:`pytan.poller.QuestionPoller.stop` method or the answers are complete.
* Any callbacks can call :func:`pytan.poller.QuestionPoller.setPercentCompleteThreshold` to change what "done" means on the fly
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.start = datetime.utcnow()
self.expiration_timeout = pytan.utils.timestr_to_datetime(timestr=self.expiration)
if self.override_timeout_secs:
td_obj = timedelta(seconds=self.override_timeout_secs)
self.override_timeout = self.start + td_obj
else:
self.override_timeout = None
clean_keys = ['callbacks', 'obj', 'pytan_help', 'handler']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
if self.override_passed_count:
self.passed_count = self.override_passed_count
m = "{}passed_count resolved override of {}".format
self.mylog.debug(m(self.id_str, self.override_passed_count))
else:
m = (
"{}Issuing an AddObject of a Question object with no Selects and the same Group "
"used by the Action object. The number of systems that should successfully run "
"the Action will be taken from result_info.passed_count for the Question asked "
"when all answers for the question have reported in."
).format
self.mylog.debug(m(self.id_str, self.obj))
self.pre_question = taniumpy.Question()
self.pre_question.group = self.target_group
self.pre_question = self.handler._add(
obj=self.pre_question, pytan_help=m(self.id_str, self.obj), **clean_kwargs
)
self.pre_question_poller = pytan.pollers.QuestionPoller(
handler=self.handler, obj=self.pre_question, **clean_kwargs
)
self.pre_question_poller.run(callbacks=callbacks, **clean_kwargs)
self.passed_count = self.pre_question_poller.result_info.passed
m = "{}passed_count resolved to {}".format
self.mylog.debug(m(self.id_str, self.passed_count))
self.seen_eq_passed = self.seen_eq_passed_loop(callbacks=callbacks, **clean_kwargs)
self.finished_eq_passed = self.finished_eq_passed_loop(callbacks=callbacks, **clean_kwargs)
self.poller_result = all([self.seen_eq_passed, self.finished_eq_passed])
return self.poller_result
[docs] def seen_eq_passed_loop(self, callbacks={}, **kwargs):
"""Method to poll Result Info for self.obj until the percentage of 'seen_count' out of 'self.passed_count' is greater than or equal to self.complete_pct
* seen_count is calculated from an aggregate GetResultData
* self.passed_count is calculated by the question asked before this method is called. that question has no selects, but has a group that is the same group as the action for this object
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
# number of systems that have SEEN the action
self.seen_count = None
# current percentage tracker
self.seen_pct = None
# loop counter
self.seen_loop_count = 1
# establish a previous result_info that's empty
self.previous_result_info = taniumpy.object_types.result_info.ResultInfo()
# establish a previous result_data that's empty
self.previous_result_data = taniumpy.object_types.result_set.ResultSet()
if self.passed_count == 0:
m = "Passed Count of Clients for filter {} is 0 -- no clients match filter".format
self.mylog.warning(m(self.target_group.text))
return False
while not self._stop:
clean_keys = ['pytan_help', 'aggregate', 'callback', 'pct']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
# re-fetch object and re-derive stopped flag and status
h = (
"Issue a GetObject for an Action in order to have access to the latest values for "
"stopped_flag and status"
)
self._refetch_obj(pytan_help=h, **clean_kwargs)
self._derive_stopped_flag(**clean_kwargs)
self._derive_status(**clean_kwargs)
# perform a GetResultInfo SOAP call, this ensures fresh data is available for
# GetResultData
h = (
"Issue a GetResultInfo for an Action to ensure fresh data is available for a "
"GetResultData call"
)
self.result_info = self.get_result_info(pytan_help=h, **clean_kwargs)
# get the aggregate resultdata
h = (
"Issue a GetResultData with the aggregate option set to True."
"This will return row counts of machines that have answered instead of"
" all the data"
)
self.result_data = self.get_result_data(aggregate=True, pytan_help=h, **clean_kwargs)
# add up the Count column for all rows
# this count will equate to the number of systems that have started to process
# this action in any way
seen_count = sum([int(x['Count'][0]) for x in self.result_data.rows])
# we use self.passed_count from the question we asked to get the number of matching
# systems for determining the current pct of completion
new_pct = pytan.utils.get_percentage(part=seen_count, whole=self.passed_count)
new_pct_str = "{0:.0f}%".format(new_pct)
complete_pct_str = "{0:.0f}%".format(self.complete_pct)
# print a progress debug string
self.progress_str = (
"Progress: Seen Action: {}, Expected Seen: {}, Percent: {}"
).format(seen_count, self.passed_count, new_pct_str)
self.progresslog.debug("{}{}".format(self.id_str, self.progress_str))
# print a timing debug string
if self.override_timeout:
time_till_expiry = self.override_timeout - datetime.utcnow()
else:
time_till_expiry = self.expiration_timeout - datetime.utcnow()
self.timing_str = (
"Timing: Started: {}, Expiration: {}, Override Timeout: {}, "
"Elapsed Time: {}, Left till expiry: {}, Loop Count: {}"
).format(
self.start,
self.expiration_timeout,
self.override_timeout,
datetime.utcnow() - self.start,
time_till_expiry,
self.seen_loop_count,
)
self.progresslog.debug("{}{}".format(self.id_str, self.timing_str))
# check to see if progress has changed, if so run the callback
seen_changed = seen_count != self.seen_count
pct_changed = self.seen_pct != new_pct
progress_changed = any([seen_changed, pct_changed])
if progress_changed:
m = "{}Progress Changed for Seen Count {} ({} of {})".format
self.progresslog.info(m(self.id_str, new_pct_str, seen_count, self.passed_count))
cb = 'SeenProgressChanged'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
# check to see if new_pct has reached complete_pct threshold, if so return True
if new_pct >= self.complete_pct:
m = "{}Reached Threshold for Seen Count of {} ({} of {})".format
m = m(self.id_str, complete_pct_str, seen_count, self.passed_count)
self.mylog.info(m)
cb = 'SeenAnswersComplete'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
return True
# check to see if override timeout is specified, if so and we have passed it, return
# False
if self.override_timeout and datetime.utcnow() >= self.override_timeout:
m = "{}Reached override timeout of {}".format
self.mylog.warning(m(self.id_str, self.override_timeout))
return False
# check to see if we have passed the actions expiration timeout, if so return False
if datetime.utcnow() >= self.expiration_timeout:
m = "{}Reached expiration timeout of {}".format
self.mylog.warning(m(self.id_str, self.expiration))
return False
# check to see if action is stopped, if it is, return False
if self.stopped_flag:
m = "{}Actions stopped flag is True".format
self.mylog.warning(m(self.id_str))
return False
# check to see if action is not active, if it is not, False
if self.status.lower() not in self.RUNNING_STATUSES:
m = "{}Action status is {}, which is not one of: {}".format
m = m(self.id_str, self.status, ', '.join(self.RUNNING_STATUSES))
self.mylog.warning(m)
return False
# if stop is called, return True
if self._stop:
m = "{}Stop called at {}".format
self.mylog.info(m(self.id_str, new_pct_str))
return True
# update our class variables to the new values determined by this loop
self.seen_pct = new_pct
self.seen_count = seen_count
self.previous_result_info = self.result_info
self.previous_result_data = self.result_data
time.sleep(self.polling_secs)
self.seen_loop_count += 1
[docs] def finished_eq_passed_loop(self, callbacks={}, **kwargs):
"""Method to poll Result Info for self.obj until the percentage of 'finished_count' out of 'self.passed_count' is greater than or equal to self.complete_pct
* finished_count is calculated from a full GetResultData call that is parsed into self.action_result_map
* self.passed_count is calculated by the question asked before this method is called. that question has no selects, but has a group that is the same group as the action for this object
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
# number of systems that have FINISHED the action
self.finished_count = None
# current percentage tracker
self.finished_pct = None
# loop counter
self.loop_count = 1
# establish a previous result_info that's empty
self.previous_result_info = taniumpy.object_types.result_info.ResultInfo()
# establish a previous result_data that's empty
self.previous_result_data = taniumpy.object_types.result_set.ResultSet()
while not self._stop:
clean_keys = ['pytan_help', 'aggregate', 'callback', 'pct']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
# re-fetch object and re-derive stopped flag and status
h = (
"Issue a GetObject for an Action in order to have access to the latest values for "
"stopped_flag and status"
)
self._refetch_obj(pytan_help=h, **clean_kwargs)
self._derive_stopped_flag(**clean_kwargs)
self._derive_status(**clean_kwargs)
# perform a GetResultInfo SOAP call, this ensures fresh data is available for
# GetResultData
h = (
"Issue a GetResultInfo for an Action to ensure fresh data is available for a "
"GetResultData call"
)
self.result_info = self.get_result_info(pytan_help=h, **clean_kwargs)
# get the NON aggregate resultdata
h = (
"Issue a GetResultData for an Action with the aggregate option set to False. "
"This will return all of the Action Statuses for each computer that have run this "
"Action"
)
self.result_data = self.get_result_data(aggregate=False, pytan_help=h, **clean_kwargs)
"""
for each row from the result data
get the computer name and the action status for this row
add the computer name to the appropriate action status in self.result_map
"""
for row in self.result_data.rows:
action_status = row['Action Statuses'][0]
comp_name = row['Computer Name'][0]
known = False
for s, smap in self.result_map.iteritems():
if action_status in smap:
known = True
if comp_name not in self.result_map[s][action_status]:
self.result_map[s][action_status].append(comp_name)
if not known:
if action_status not in self.result_map['unknown']:
self.result_map['unknown'][action_status] = []
if comp_name not in self.result_map['unknown'][action_status]:
self.result_map['unknown'][action_status].append(comp_name)
for s, smap in self.result_map.iteritems():
smap['total'] = sum([len(y) for x, y in smap.iteritems() if x != 'total'])
# Use the total from the key defined in self.ACTION_DONE_KEY in self.result_map
# this total will equate to the number of systems that have finished this action
finished_count = self.result_map[self.ACTION_DONE_KEY]['total']
# we use self.passed_count from the question we asked to get the number of matching
# systems for determining the current pct of completion
new_pct = pytan.utils.get_percentage(part=finished_count, whole=self.passed_count)
new_pct_str = "{0:.0f}%".format(new_pct)
complete_pct_str = "{0:.0f}%".format(self.complete_pct)
# print a progress debug string
p = "{}: {}".format
progress_list = [p(s, smap['total']) for s, smap in self.result_map.iteritems()]
progress_list.append("Done Key: {}".format(self.ACTION_DONE_KEY))
progress_list.append("Passed Count: {}".format(self.passed_count))
self.progress_str = ', '.join(progress_list)
self.progresslog.debug("{}{}".format(self.id_str, self.progress_str))
# print a timing debug string
if self.override_timeout:
time_till_expiry = self.override_timeout - datetime.utcnow()
else:
time_till_expiry = self.expiration_timeout - datetime.utcnow()
self.timing_str = (
"Timing: Started: {}, Expiration: {}, Override Timeout: {}, "
"Elapsed Time: {}, Left till expiry: {}, Loop Count: {}"
).format(
self.start,
self.expiration_timeout,
self.override_timeout,
datetime.utcnow() - self.start,
time_till_expiry,
self.loop_count,
)
self.progresslog.debug("{}{}".format(self.id_str, self.timing_str))
# check to see if progress has changed, if so run the callback
finished_changed = finished_count != self.finished_count
pct_changed = self.finished_pct != new_pct
progress_changed = any([finished_changed, pct_changed])
if progress_changed:
m = "{}Progress Changed for Finished Count {} ({} of {})".format
m = m(self.id_str, new_pct_str, finished_count, self.passed_count)
self.progresslog.info(m)
cb = 'FinishedProgressChanged'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
# check to see if new_pct has reached complete_pct threshold, if so return True
if new_pct >= self.complete_pct:
m = "{}Reached Threshold for Finished Count of {} ({} of {})".format
m = m(self.id_str, complete_pct_str, finished_count, self.passed_count)
self.mylog.info(m)
cb = 'FinishedAnswersComplete'
self.run_callback(callbacks=callbacks, callback=cb, pct=new_pct, **clean_kwargs)
return True
# check to see if override timeout is specified, if so and we have passed it, return
# False
if self.override_timeout and datetime.utcnow() >= self.override_timeout:
m = "{}Reached override timeout of {}".format
self.mylog.warning(m(self.id_str, self.override_timeout))
return False
# check to see if we have passed the actions expiration timeout, if so return False
if datetime.utcnow() >= self.expiration_timeout:
m = "{}Reached expiration timeout of {}".format
self.mylog.warning(m(self.id_str, self.expiration))
return False
# check to see if action is stopped, if it is, return False
if self.stopped_flag:
m = "{}Actions stopped flag is True".format
self.mylog.warning(m(self.id_str))
return False
# check to see if action is not active, if it is not, False
if self.status.lower() not in self.RUNNING_STATUSES:
m = "{}Action status is {}, which is not one of: {}".format
m = m(self.id_str, self.status, ', '.join(self.RUNNING_STATUSES))
self.mylog.warning(m)
return False
# if stop is called, return True
if self._stop:
m = "{}Stop called at {}".format
self.mylog.info(m(self.id_str, new_pct_str))
return True
# update our class variables to the new values determined by this loop
self.finished_pct = new_pct
self.finished_count = finished_count
self.previous_result_info = self.result_info
self.previous_result_data = self.result_data
time.sleep(self.polling_secs)
self.loop_count += 1
[docs]class SSEPoller(QuestionPoller):
"""A class to poll the progress of a Server Side Export.
The primary function of this class is to poll for status of server side exports.
Parameters
----------
handler : :class:`pytan.handler.Handler`
PyTan handler to use for GetResultInfo calls
export_id : str
* ID of server side export
polling_secs : int, optional
* default: 2
* Number of seconds to wait in between status check loops
timeout_secs : int, optional
* default: 600
* timeout in seconds for waiting for status completion, 0 does not time out
"""
STR_ATTRS = [
'export_id',
'polling_secs',
'timeout_secs',
'sse_status',
]
"""Class attributes to include in __str__ output"""
POLLING_SECS_DEFAULT = 2
"""default value for self.polling_secs"""
TIMEOUT_SECS_DEFAULT = 600
"""default value for self.timeout_secs"""
export_id = None
"""The export_id for this poller"""
def __init__(self, handler, export_id, **kwargs):
self.methodlog = logging.getLogger("method_debug")
self.DEBUG_METHOD_LOCALS = kwargs.get('debug_method_locals', False)
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.setup_logging()
if not isinstance(handler, pytan.handler.Handler):
m = "{} is not a valid handler instance! Must be a: {!r}".format
raise pytan.exceptions.PollingError(m(type(handler), pytan.handler.Handler))
self.handler = handler
self.export_id = export_id
self.polling_secs = kwargs.get('polling_secs', self.POLLING_SECS_DEFAULT)
self.timeout_secs = kwargs.get('timeout_secs', self.TIMEOUT_SECS_DEFAULT)
self.id_str = "ID '{}': ".format(export_id)
self.poller_result = None
self.sse_status = "Not yet run"
self._post_init(**kwargs)
[docs] def _post_init(self, **kwargs):
"""Post init class setup"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
pass
[docs] def get_sse_status(self, **kwargs):
"""Function to get the status of a server side export
Constructs a URL via: export/${export_id}.status and performs an authenticated HTTP get
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['url']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
export_id = kwargs.get('export_id', self.export_id)
short_url = 'export/{}.status'.format(export_id)
full_url = self.handler.session._full_url(url=short_url)
h = "Perform an HTTP get to retrieve the status of a server side export"
clean_kwargs['pytan_help'] = clean_kwargs.get('pytan_help', h)
ret = self.handler.session.http_get(url=short_url, **clean_kwargs).strip()
# print a progress debug string
progress_str = "Server Side Export Progress: '{}' from URL: {}".format
progress_str = progress_str(ret, full_url)
self.progresslog.debug("{}{}".format(self.id_str, progress_str))
return ret
[docs] def get_sse_data(self, **kwargs):
"""Function to get the data of a server side export
Constructs a URL via: export/${export_id}.gz and performs an authenticated HTTP get
"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
clean_keys = ['url']
clean_kwargs = pytan.utils.clean_kwargs(kwargs=kwargs, keys=clean_keys)
export_id = kwargs.get('export_id', self.export_id)
short_url = 'export/{}.gz'.format(export_id)
full_url = self.handler.session._full_url(url=short_url)
h = "Perform an HTTP get to retrieve the data of a server side export"
clean_kwargs['pytan_help'] = clean_kwargs.get('pytan_help', h)
ret = self.handler.session.http_get(url=short_url, **clean_kwargs)
# print a progress debug string
progress_str = "Server Side Export Data Length: {} from URL: {}".format
progress_str = progress_str(len(ret), full_url)
self.progresslog.debug("{}{}".format(self.id_str, progress_str))
return ret
[docs] def run(self, **kwargs):
"""Poll for server side export status"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
self.start = datetime.utcnow()
if self.timeout_secs:
td_obj = timedelta(seconds=self.timeout_secs)
self.timeout = self.start + td_obj
else:
self.timeout = None
self.sse_status_completed = self.sse_status_has_completed_loop(**kwargs)
self.poller_result = all([self.sse_status_completed])
return self.poller_result
[docs] def sse_status_has_completed_loop(self, **kwargs):
"""Method to poll the status file for a server side export until it contains 'Completed'"""
self._debug_locals(sys._getframe().f_code.co_name, locals())
# loop counter
self.loop_count = 1
# establish a previous result_info that's empty
self.previous_sse_status = ''
while not self._stop:
# get the SSE status
self.sse_status = self.get_sse_status(**kwargs)
# print a timing debug string
if self.timeout:
time_till_expiry = self.timeout - datetime.utcnow()
else:
time_till_expiry = 'Never'
self.timing_str = (
"Timing: Started: {}, Timeout: {}, Elapsed Time: {}, Left till expiry: {}, "
"Loop Count: {}"
).format(
self.start,
self.timeout,
datetime.utcnow() - self.start,
time_till_expiry,
self.loop_count,
)
self.progresslog.debug("{}{}".format(self.id_str, self.timing_str))
# check to see if progress has changed, if so print progress log info
progress_changed = any([
self.previous_sse_status != self.sse_status,
])
if progress_changed:
m = "{}Progress Changed: '{}'".format
self.progresslog.info(m(self.id_str, self.sse_status))
if 'failed' in self.sse_status.lower():
m = "{}Server Side Export Failed: '{}'".format
raise pytan.exceptions.ServerSideExportError(m(self.id_str, self.sse_status))
if 'completed' in self.sse_status.lower():
m = "{}Server Side Export Completed: '{}'".format
self.mylog.info(m(self.id_str, self.sse_status))
return True
# check to see if timeout is specified, if so and we have passed it, return
# False
if self.timeout and datetime.utcnow() >= self.timeout:
m = "{}Reached timeout of {}".format
self.mylog.warning(m(self.id_str, self.timeout))
return False
# if stop is called, return True
if self._stop:
m = "{}Stop called at {}".format
self.mylog.info(m(self.id_str, self.sse_status))
return False
# update our class variables to the new values determined by this loop
self.previous_sse_status = self.sse_status
time.sleep(self.polling_secs)
self.loop_count += 1