2.4.3 policy-handler - try-catch top Exceptions

- added try-except for top level Exception into all threads
  of policy-handler to avoid losing the thread and tracking
  the unexpected crashes
- rediscover the deployment-handler if not found before
  and after each catchup
- refactored audit - separated metrics from audit
- added more stats and runtime info to healthcheck
  = gc counts and garbage info if any detected
  = memory usage - to detect the potential memory leaks
  = request_id to all stats
  = stats of active requests
- avoid reallocating the whole Queue of policy-updates after catchup
  = clear of the internal queue under proper lock

Change-Id: I3fabcaac70419a68bd070ff7d591a75942f37663
Signed-off-by: Alex Shatov <alexs@att.com>
Issue-ID: DCAEGEN2-483
diff --git a/policyhandler/deploy_handler.py b/policyhandler/deploy_handler.py
index 139e660..4ea5ad1 100644
--- a/policyhandler/deploy_handler.py
+++ b/policyhandler/deploy_handler.py
@@ -26,7 +26,7 @@
 from .config import Config
 from .customize import CustomizerUser
 from .discovery import DiscoveryClient
-from .onap.audit import REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode
+from .onap.audit import REQUEST_X_ECOMP_REQUESTID, AuditHttpCode, Metrics
 
 POOL_SIZE = 1
 
@@ -44,27 +44,27 @@
     _server_instance_uuid = None
 
     @staticmethod
-    def _lazy_init(audit):
+    def _lazy_init(audit, rediscover=False):
         """ set static properties """
-        if DeployHandler._lazy_inited:
+        if DeployHandler._lazy_inited and not rediscover:
             return
-        DeployHandler._lazy_inited = True
 
-        DeployHandler._custom_kwargs = CustomizerUser.get_customizer() \
-                                            .get_deploy_handler_kwargs(audit)
-        if not DeployHandler._custom_kwargs \
-        or not isinstance(DeployHandler._custom_kwargs, dict):
+        DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
+                                        .get_deploy_handler_kwargs(audit))
+        if (not DeployHandler._custom_kwargs
+                or not isinstance(DeployHandler._custom_kwargs, dict)):
             DeployHandler._custom_kwargs = {}
 
-        DeployHandler._requests_session = requests.Session()
-        DeployHandler._requests_session.mount(
-            'https://',
-            requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
-        )
-        DeployHandler._requests_session.mount(
-            'http://',
-            requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
-        )
+        if not DeployHandler._requests_session:
+            DeployHandler._requests_session = requests.Session()
+            DeployHandler._requests_session.mount(
+                'https://',
+                requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+            )
+            DeployHandler._requests_session.mount(
+                'http://',
+                requests.adapters.HTTPAdapter(pool_connections=POOL_SIZE, pool_maxsize=POOL_SIZE)
+            )
 
         config_dh = Config.config.get("deploy_handler")
         if config_dh and isinstance(config_dh, dict):
@@ -77,7 +77,7 @@
             DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
             DeployHandler._url = config_dh.get("url")
             DeployHandler._logger.info("dns based routing to %s: url(%s)",
-                DeployHandler._target_entity, DeployHandler._url)
+                                       DeployHandler._target_entity, DeployHandler._url)
 
         if not DeployHandler._url:
             # discover routing to deployment-handler at consul-services
@@ -85,14 +85,18 @@
                 # config for policy-handler <= 2.3.1
                 # "deploy_handler" : "deployment_handler"
                 DeployHandler._target_entity = str(config_dh or "deployment_handler")
-            DeployHandler._url = DiscoveryClient.get_service_url(audit, DeployHandler._target_entity)
+            DeployHandler._url = DiscoveryClient.get_service_url(audit,
+                                                                 DeployHandler._target_entity)
 
         DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
         DeployHandler._logger.info(
             "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
 
+        DeployHandler._lazy_inited = bool(DeployHandler._url)
+
+
     @staticmethod
-    def policy_update(audit, message):
+    def policy_update(audit, message, rediscover=False):
         """
         post policy_updated message to deploy-handler
 
@@ -101,10 +105,10 @@
         if not message:
             return
 
-        DeployHandler._lazy_init(audit)
-        sub_aud = Audit(aud_parent=audit, targetEntity=DeployHandler._target_entity,
-                        targetServiceName=DeployHandler._url_policy)
-        headers = {REQUEST_X_ECOMP_REQUESTID : sub_aud.request_id}
+        DeployHandler._lazy_init(audit, rediscover)
+        metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
+                          targetServiceName=DeployHandler._url_policy)
+        headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
 
         msg_str = json.dumps(message)
         headers_str = json.dumps(headers)
@@ -114,14 +118,14 @@
         log_data = " msg={0} headers={1}".format(msg_str, headers_str)
         log_line = log_action + log_data
         DeployHandler._logger.info(log_line)
-        sub_aud.metrics_start(log_line)
+        metrics.metrics_start(log_line)
 
         if not DeployHandler._url:
             error_msg = "no url found to {0}".format(log_line)
             DeployHandler._logger.error(error_msg)
-            sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
+            metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
             audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
-            sub_aud.metrics(error_msg)
+            metrics.metrics(error_msg)
             return
 
         res = None
@@ -130,25 +134,30 @@
                 DeployHandler._url_policy, json=message, headers=headers,
                 **DeployHandler._custom_kwargs
             )
-        except requests.exceptions.RequestException as ex:
-            error_msg = "failed to {0}: {1}{2}".format(log_action, str(ex), log_data)
+        except Exception as ex:
+            error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+                          if isinstance(ex, requests.exceptions.RequestException)
+                          else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            error_msg = ("failed to {0} {1}: {2}{3}"
+                         .format(log_action, type(ex).__name__, str(ex), log_data))
             DeployHandler._logger.exception(error_msg)
-            sub_aud.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
-            audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
-            sub_aud.metrics(error_msg)
+            metrics.set_http_status_code(error_code)
+            audit.set_http_status_code(error_code)
+            metrics.metrics(error_msg)
             return
 
-        sub_aud.set_http_status_code(res.status_code)
+        metrics.set_http_status_code(res.status_code)
         audit.set_http_status_code(res.status_code)
 
         log_line = "response {0} from {1}: text={2}{3}" \
             .format(res.status_code, log_action, res.text, log_data)
-        sub_aud.metrics(log_line)
-        DeployHandler._logger.info(log_line)
+        metrics.metrics(log_line)
 
         if res.status_code != requests.codes.ok:
+            DeployHandler._logger.error(log_line)
             return
 
+        DeployHandler._logger.info(log_line)
         result = res.json() or {}
         prev_server_instance_uuid = DeployHandler._server_instance_uuid
         DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
@@ -156,9 +165,9 @@
         deployment_handler_changed = (prev_server_instance_uuid
             and prev_server_instance_uuid != DeployHandler._server_instance_uuid)
         if deployment_handler_changed:
-            log_line = "deployment_handler_changed: {1} != {0}" \
-                .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid)
-            sub_aud.info(log_line)
+            log_line = ("deployment_handler_changed: {1} != {0}"
+                        .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
+            metrics.info(log_line)
             DeployHandler._logger.info(log_line)
 
         return deployment_handler_changed
diff --git a/policyhandler/onap/audit.py b/policyhandler/onap/audit.py
index 08dcd37..48988fe 100644
--- a/policyhandler/onap/audit.py
+++ b/policyhandler/onap/audit.py
@@ -26,16 +26,19 @@
 """
 
 import copy
+import gc
 import json
 import os
 import re
 import subprocess
 import sys
+import threading
 import time
 import uuid
 from datetime import datetime
 from enum import Enum
-from threading import Lock
+
+import psutil
 
 from .CommonLogger import CommonLogger
 from .health import Health
@@ -51,10 +54,15 @@
 AUDIT_TARGET_ENTITY = 'targetEntity'
 AUDIT_METRICS = 'metrics'
 AUDIT_TOTAL_STATS = 'audit_total_stats'
+METRICS_TOTAL_STATS = 'metrics_total_stats'
 
 HEADER_CLIENTAUTH = "clientauth"
 HEADER_AUTHORIZATION = "authorization"
 
+ERROR_CODE = "errorCode"
+ERROR_DESCRIPTION = "errorDescription"
+
+
 class AuditHttpCode(Enum):
     """audit http codes"""
     HTTP_OK = 200
@@ -67,6 +75,7 @@
     DATA_ERROR = 1030
     SCHEMA_ERROR = 1040
 
+
 class AuditResponseCode(Enum):
     """audit response codes"""
     SUCCESS = 0
@@ -107,7 +116,60 @@
             return "unknown"
         return response_code.name.lower().replace("_", " ")
 
-class Audit(object):
+
+class ProcessInfo(object):
+    """static class to calculate process info"""
+    _KILO_SYMBOLS = ('KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
+    _KILO_POWERS = {}
+
+    @staticmethod
+    def init():
+        """init static constants"""
+        for i, kilo_symbol in enumerate(ProcessInfo._KILO_SYMBOLS):
+            ProcessInfo._KILO_POWERS[kilo_symbol] = 1 << (i + 1) * 10
+        ProcessInfo._KILO_SYMBOLS = list(reversed(ProcessInfo._KILO_SYMBOLS))
+
+    @staticmethod
+    def bytes_to_human(byte_count):
+        """converts byte count to human value in kilo-powers"""
+        for kilo_symbol in ProcessInfo._KILO_SYMBOLS:
+            kilo_power = ProcessInfo._KILO_POWERS[kilo_symbol]
+            if byte_count >= kilo_power:
+                value = float(byte_count) / kilo_power
+                return '%.1f%s' % (value, kilo_symbol)
+        return "%sB" % byte_count
+
+    @staticmethod
+    def mem_info():
+        """calculates the memory usage of the current process"""
+        process = psutil.Process()
+        with process.oneshot():
+            mem = process.memory_full_info()
+            return {
+                "uss" : ProcessInfo.bytes_to_human(mem.uss),
+                "rss" : ProcessInfo.bytes_to_human(mem.rss),
+                "swap" : ProcessInfo.bytes_to_human(getattr(mem, "swap", 0)),
+                "pss" : ProcessInfo.bytes_to_human(getattr(mem, "pss", 0))
+            }
+
+
+    @staticmethod
+    def gc_info(full=False):
+        """gets info from garbage collector"""
+        gc_info = {
+            "gc_count" : str(gc.get_count()),
+            "gc_threshold" : str(gc.get_threshold())
+        }
+        try:
+            if gc.garbage:
+                gc_info["gc_garbage"] = ([repr(stuck) for stuck in gc.garbage]
+                                         if full else len(gc.garbage))
+        except Exception:
+            pass
+        return gc_info
+
+
+class _Audit(object):
     """put the audit object on stack per each initiating request in the system
 
     :request_id: is the X-ECOMP-RequestID for tracing
@@ -121,7 +183,7 @@
     _service_name = ""
     _service_version = ""
     _service_instance_uuid = str(uuid.uuid4())
-    _started = datetime.now()
+    _started = datetime.utcnow()
     _key_format = re.compile(r"\W")
     _logger_debug = None
     _logger_error = None
@@ -129,95 +191,69 @@
     _logger_audit = None
     _health = Health()
     _py_ver = sys.version.replace("\n", "")
-    try:
-        _packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
-    except subprocess.CalledProcessError:
-        _packages = []
+    _packages = []
 
     @staticmethod
     def init(service_name, service_version, config_file_path):
         """init static invariants and loggers"""
-        Audit._service_name = service_name
-        Audit._service_version = service_version
-        Audit._logger_debug = CommonLogger(config_file_path, "debug", \
-            instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-        Audit._logger_error = CommonLogger(config_file_path, "error", \
-            instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-        Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
-            instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
-        Audit._logger_audit = CommonLogger(config_file_path, "audit", \
-            instanceUUID=Audit._service_instance_uuid, serviceName=Audit._service_name)
+        _Audit._service_name = service_name
+        _Audit._service_version = service_version
+        _Audit._logger_debug = CommonLogger(config_file_path, "debug", \
+            instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+        _Audit._logger_error = CommonLogger(config_file_path, "error", \
+            instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+        _Audit._logger_metrics = CommonLogger(config_file_path, "metrics", \
+            instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+        _Audit._logger_audit = CommonLogger(config_file_path, "audit", \
+            instanceUUID=_Audit._service_instance_uuid, serviceName=_Audit._service_name)
+        ProcessInfo.init()
+        try:
+            _Audit._packages = filter(None, subprocess.check_output(["pip", "freeze"]).splitlines())
+        except subprocess.CalledProcessError:
+            pass
 
-    @staticmethod
-    def health():
+
+    def health(self, full=False):
         """returns json for health check"""
-        now = datetime.now()
-        return {
-            "service_name" : Audit._service_name,
-            "service_version" : Audit._service_version,
-            "service_instance_uuid" : Audit._service_instance_uuid,
-            "python" : Audit._py_ver,
-            "started" : str(Audit._started),
-            "now" : str(now),
-            "uptime" : str(now - Audit._started),
-            "stats" : Audit._health.dump(),
-            "packages" : Audit._packages
+        utcnow = datetime.utcnow()
+        health = {
+            "server" : {
+                "service_name" : _Audit._service_name,
+                "service_version" : _Audit._service_version,
+                "service_instance_uuid" : _Audit._service_instance_uuid
+            },
+            "runtime" : {
+                "started" : str(_Audit._started),
+                "utcnow" : str(utcnow),
+                "uptime" : str(utcnow - _Audit._started),
+                "active_threads" : sorted([thr.name for thr in threading.enumerate()]),
+                "gc" : ProcessInfo.gc_info(full),
+                "mem_info" : ProcessInfo.mem_info()
+            },
+            "stats" : _Audit._health.dump(),
+            "soft" : {"python" : _Audit._py_ver, "packages" : _Audit._packages}
         }
+        health_txt = "{} health: {}".format(_Audit._service_name, json.dumps(health))
+        self.info(health_txt)
+        return health
 
-    def __init__(self, job_name=None, request_id=None, req_message=None, aud_parent=None, **kwargs):
+
+    def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
         """create audit object per each request in the system
 
         :job_name: is the name of the audit job for health stats
         :request_id: is the X-ECOMP-RequestID for tracing
         :req_message: is the request message string for logging
-        :aud_parent: is the parent Audit - used for sub-query metrics to other systems
         :kwargs: - put any request related params into kwargs
         """
-        self.job_name = Audit._key_format.sub('_', job_name or req_message or Audit._service_name)
+        self.job_name = _Audit._key_format.sub('_', job_name or req_message or _Audit._service_name)
         self.request_id = request_id
         self.req_message = req_message or ""
-        self.aud_parent = aud_parent
         self.kwargs = kwargs or {}
 
-        self.retry_get_config = False
         self.max_http_status_code = 0
-        self._lock = Lock()
+        self._lock = threading.Lock()
 
-        if self.aud_parent:
-            self.job_name = Audit._key_format.sub(
-                '_', job_name or self.aud_parent.job_name or Audit._service_name)
-            if not self.request_id:
-                self.request_id = self.aud_parent.request_id
-            if not self.req_message:
-                self.req_message = self.aud_parent.req_message
-            self.kwargs = self.aud_parent.merge_all_kwargs(**self.kwargs)
-        else:
-            headers = self.kwargs.get("headers", {})
-            if headers:
-                if not self.request_id:
-                    self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
-                if AUDIT_IPADDRESS not in self.kwargs:
-                    self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
-                if AUDIT_SERVER not in self.kwargs:
-                    self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
-
-        if AUDIT_SERVER not in self.kwargs:
-            self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
-
-        created_req = ""
-        if not self.request_id:
-            created_req = " with new"
-            self.request_id = str(uuid.uuid4())
-
-        self.kwargs[AUDIT_REQUESTID] = self.request_id
-
-        self._started = time.time()
-        self._start_event = Audit._logger_audit.getStartRecordEvent()
-        self.metrics_start()
-
-        if not self.aud_parent:
-            self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
-                .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
 
     def merge_all_kwargs(self, **kwargs):
         """returns the merge of copy of self.kwargs with the param kwargs"""
@@ -228,16 +264,14 @@
 
     def set_http_status_code(self, http_status_code):
         """accumulate the highest(worst) http status code"""
-        self._lock.acquire()
-        if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
-            self.max_http_status_code = max(http_status_code, self.max_http_status_code)
-        self._lock.release()
+        with self._lock:
+            if self.max_http_status_code < AuditHttpCode.SERVER_INTERNAL_ERROR.value:
+                self.max_http_status_code = max(http_status_code, self.max_http_status_code)
 
     def get_max_http_status_code(self):
         """returns the highest(worst) http status code"""
-        self._lock.acquire()
-        max_http_status_code = self.max_http_status_code
-        self._lock.release()
+        with self._lock:
+            max_http_status_code = self.max_http_status_code
         return max_http_status_code
 
     @staticmethod
@@ -247,33 +281,11 @@
             return 'COMPLETE'
         return 'ERROR'
 
-    @staticmethod
-    def hide_secrets(obj):
-        """hides the known secret field values of the dictionary"""
-        if not isinstance(obj, dict):
-            return obj
-
-        for key in obj:
-            if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
-                obj[key] = "*"
-            elif isinstance(obj[key], dict):
-                obj[key] = Audit.hide_secrets(obj[key])
-
-        return obj
-
-    @staticmethod
-    def log_json_dumps(obj, **kwargs):
-        """hide the known secret field values of the dictionary and return json.dumps"""
-        if not isinstance(obj, dict):
-            return json.dumps(obj, **kwargs)
-
-        return json.dumps(Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
-
     def is_serious_error(self, status_code):
         """returns whether the response_code is success and a human text for response code"""
-        return AuditResponseCode.PERMISSION_ERROR.value \
-            == AuditResponseCode.get_response_code(status_code).value \
-            or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value
+        return (AuditResponseCode.PERMISSION_ERROR.value
+                == AuditResponseCode.get_response_code(status_code).value
+                or self.get_max_http_status_code() >= AuditHttpCode.SERVER_INTERNAL_ERROR.value)
 
     def _get_response_status(self):
         """calculates the response status fields from max_http_status_code"""
@@ -290,76 +302,123 @@
 
     def debug(self, log_line, **kwargs):
         """debug - the debug=lowest level of logging"""
-        Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
+        _Audit._logger_debug.debug(log_line, **self.merge_all_kwargs(**kwargs))
 
     def info(self, log_line, **kwargs):
         """debug - the info level of logging"""
-        Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
+        _Audit._logger_debug.info(log_line, **self.merge_all_kwargs(**kwargs))
 
     def info_requested(self, result=None, **kwargs):
         """info "requested ..." - the info level of logging"""
         self.info("requested {0} {1}".format(self.req_message, result or ""), \
             **self.merge_all_kwargs(**kwargs))
 
-    def warn(self, log_line, **kwargs):
+    def warn(self, log_line, error_code=None, **kwargs):
         """debug+error - the warn level of logging"""
         all_kwargs = self.merge_all_kwargs(**kwargs)
-        Audit._logger_debug.warn(log_line, **all_kwargs)
-        Audit._logger_error.warn(log_line, **all_kwargs)
 
-    def error(self, log_line, **kwargs):
+        if error_code and isinstance(error_code, AuditResponseCode):
+            all_kwargs[ERROR_CODE] = error_code.value
+            all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+        _Audit._logger_debug.warn(log_line, **all_kwargs)
+        _Audit._logger_error.warn(log_line, **all_kwargs)
+
+    def error(self, log_line, error_code=None, **kwargs):
         """debug+error - the error level of logging"""
         all_kwargs = self.merge_all_kwargs(**kwargs)
-        Audit._logger_debug.error(log_line, **all_kwargs)
-        Audit._logger_error.error(log_line, **all_kwargs)
 
-    def fatal(self, log_line, **kwargs):
+        if error_code and isinstance(error_code, AuditResponseCode):
+            all_kwargs[ERROR_CODE] = error_code.value
+            all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+        _Audit._logger_debug.error(log_line, **all_kwargs)
+        _Audit._logger_error.error(log_line, **all_kwargs)
+
+    def fatal(self, log_line, error_code=None, **kwargs):
         """debug+error - the fatal level of logging"""
         all_kwargs = self.merge_all_kwargs(**kwargs)
-        Audit._logger_debug.fatal(log_line, **all_kwargs)
-        Audit._logger_error.fatal(log_line, **all_kwargs)
+
+        if error_code and isinstance(error_code, AuditResponseCode):
+            all_kwargs[ERROR_CODE] = error_code.value
+            all_kwargs[ERROR_DESCRIPTION] = AuditResponseCode.get_human_text(error_code)
+
+        _Audit._logger_debug.fatal(log_line, **all_kwargs)
+        _Audit._logger_error.fatal(log_line, **all_kwargs)
+
+    @staticmethod
+    def hide_secrets(obj):
+        """hides the known secret field values of the dictionary"""
+        if not isinstance(obj, dict):
+            return obj
+
+        for key in obj:
+            if key.lower() in [HEADER_CLIENTAUTH, HEADER_AUTHORIZATION]:
+                obj[key] = "*"
+            elif isinstance(obj[key], dict):
+                obj[key] = _Audit.hide_secrets(obj[key])
+
+        return obj
+
+    @staticmethod
+    def log_json_dumps(obj, **kwargs):
+        """hide the known secret field values of the dictionary and return json.dumps"""
+        if not isinstance(obj, dict):
+            return json.dumps(obj, **kwargs)
+
+        return json.dumps(_Audit.hide_secrets(copy.deepcopy(obj)), **kwargs)
 
     @staticmethod
     def get_elapsed_time(started):
         """returns the elapsed time since started in milliseconds"""
-        return int(round(1000 * (time.time() - started)))
+        return int(round(1000 * (time.time() - (started or 0))))
 
-    def metrics_start(self, log_line=None, **kwargs):
-        """reset metrics timing"""
-        self._metrics_started = time.time()
-        self._metrics_start_event = Audit._logger_metrics.getStartRecordEvent()
-        if log_line:
-            self.info(log_line, **self.merge_all_kwargs(**kwargs))
 
-    def metrics(self, log_line, **kwargs):
-        """debug+metrics - the metrics=sub-audit level of logging"""
-        all_kwargs = self.merge_all_kwargs(**kwargs)
-        success, max_http_status_code, response_code, response_description = \
-            self._get_response_status()
-        metrics_func = None
-        timer = Audit.get_elapsed_time(self._metrics_started)
-        metrics_job = Audit._key_format.sub(
-            '_', all_kwargs.get(AUDIT_TARGET_ENTITY, AUDIT_METRICS + "_" + self.job_name))
-        if success:
-            log_line = "done: {0}".format(log_line)
-            self.info(log_line, **all_kwargs)
-            metrics_func = Audit._logger_metrics.info
-            Audit._health.success(metrics_job, timer)
-        else:
-            log_line = "failed: {0}".format(log_line)
-            self.error(log_line, errorCode=response_code.value, \
-                errorDescription=response_description, **all_kwargs)
-            metrics_func = Audit._logger_metrics.error
-            Audit._health.error(metrics_job, timer)
+class Audit(_Audit):
+    """Audit class to track the high level operations"""
 
-        metrics_func(log_line, begTime=self._metrics_start_event, timer=timer,
-                     statusCode=Audit.get_status_code(success), responseCode=response_code.value,
-                     responseDescription=response_description,
-                     **all_kwargs
-                    )
+    def __init__(self, job_name=None, request_id=None, req_message=None, **kwargs):
+        """create audit object per each request in the system
 
-        self.metrics_start()
-        return (success, max_http_status_code, response_description)
+        :job_name: is the name of the audit job for health stats
+        :request_id: is the X-ECOMP-RequestID for tracing
+        :req_message: is the request message string for logging
+        :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+        :kwargs: - put any request related params into kwargs
+        """
+        super(Audit, self).__init__(job_name=job_name,
+                                    request_id=request_id,
+                                    req_message=req_message,
+                                    **kwargs)
+
+        headers = self.kwargs.get("headers", {})
+        if headers:
+            if not self.request_id:
+                self.request_id = headers.get(REQUEST_X_ECOMP_REQUESTID)
+            if AUDIT_IPADDRESS not in self.kwargs:
+                self.kwargs[AUDIT_IPADDRESS] = headers.get(REQUEST_REMOTE_ADDR)
+            if AUDIT_SERVER not in self.kwargs:
+                self.kwargs[AUDIT_SERVER] = headers.get(REQUEST_HOST)
+
+        created_req = ""
+        if not self.request_id:
+            created_req = " with new"
+            self.request_id = str(uuid.uuid4())
+
+        if AUDIT_SERVER not in self.kwargs:
+            self.kwargs[AUDIT_SERVER] = os.environ.get(HOSTNAME)
+
+        self.kwargs[AUDIT_REQUESTID] = self.request_id
+
+        _Audit._health.start(self.job_name, self.request_id)
+        _Audit._health.start(AUDIT_TOTAL_STATS, self.request_id)
+
+        self._started = time.time()
+        self._start_event = Audit._logger_audit.getStartRecordEvent()
+
+        self.info("new audit{0} request_id {1}, msg({2}), kwargs({3})"\
+            .format(created_req, self.request_id, self.req_message, json.dumps(self.kwargs)))
+
 
     def audit_done(self, result=None, **kwargs):
         """debug+audit - the audit=top level of logging"""
@@ -368,26 +427,90 @@
             self._get_response_status()
         log_line = "{0} {1}".format(self.req_message, result or "").strip()
         audit_func = None
-        timer = Audit.get_elapsed_time(self._started)
+        timer = _Audit.get_elapsed_time(self._started)
         if success:
             log_line = "done: {0}".format(log_line)
             self.info(log_line, **all_kwargs)
-            audit_func = Audit._logger_audit.info
-            Audit._health.success(self.job_name, timer)
-            Audit._health.success(AUDIT_TOTAL_STATS, timer)
+            audit_func = _Audit._logger_audit.info
+            _Audit._health.success(self.job_name, timer, self.request_id)
+            _Audit._health.success(AUDIT_TOTAL_STATS, timer, self.request_id)
         else:
             log_line = "failed: {0}".format(log_line)
             self.error(log_line, errorCode=response_code.value,
                        errorDescription=response_description, **all_kwargs)
-            audit_func = Audit._logger_audit.error
-            Audit._health.error(self.job_name, timer)
-            Audit._health.error(AUDIT_TOTAL_STATS, timer)
+            audit_func = _Audit._logger_audit.error
+            _Audit._health.error(self.job_name, timer, self.request_id)
+            _Audit._health.error(AUDIT_TOTAL_STATS, timer, self.request_id)
 
         audit_func(log_line, begTime=self._start_event, timer=timer,
-                   statusCode=Audit.get_status_code(success),
+                   statusCode=_Audit.get_status_code(success),
                    responseCode=response_code.value,
                    responseDescription=response_description,
-                   **all_kwargs
-                  )
+                   **all_kwargs)
+
+        return (success, max_http_status_code, response_description)
+
+
+class Metrics(_Audit):
+    """Metrics class to track the calls to outside systems"""
+
+    def __init__(self, aud_parent, **kwargs):
+        """create audit object per each request in the system
+
+        :aud_parent: is the parent Audit - used for sub-query metrics to other systems
+        :kwargs: - put any request related params into kwargs
+        """
+        super(Metrics, self).__init__(job_name=aud_parent.job_name,
+                                      request_id=aud_parent.request_id,
+                                      req_message=aud_parent.req_message,
+                                      **aud_parent.merge_all_kwargs(**kwargs))
+        self.aud_parent = aud_parent
+        self._metrics_name = _Audit._key_format.sub(
+            '_', AUDIT_METRICS + "_" + self.kwargs.get(AUDIT_TARGET_ENTITY, self.job_name))
+
+        self._metrics_started = None
+        self._metrics_start_event = None
+
+
+    def metrics_start(self, log_line=None, **kwargs):
+        """reset metrics timing"""
+        self.merge_all_kwargs(**kwargs)
+        self._metrics_started = time.time()
+        self._metrics_start_event = _Audit._logger_metrics.getStartRecordEvent()
+        if log_line:
+            self.info(log_line, **self.merge_all_kwargs(**kwargs))
+        _Audit._health.start(self._metrics_name, self.request_id)
+        _Audit._health.start(METRICS_TOTAL_STATS, self.request_id)
+
+
+    def metrics(self, log_line, **kwargs):
+        """debug+metrics - the metrics=sub-audit level of logging"""
+        all_kwargs = self.merge_all_kwargs(**kwargs)
+        success, max_http_status_code, response_code, response_description = \
+            self._get_response_status()
+        metrics_func = None
+        timer = _Audit.get_elapsed_time(self._metrics_started)
+        if success:
+            log_line = "done: {0}".format(log_line)
+            self.info(log_line, **all_kwargs)
+            metrics_func = _Audit._logger_metrics.info
+            _Audit._health.success(self._metrics_name, timer, self.request_id)
+            _Audit._health.success(METRICS_TOTAL_STATS, timer, self.request_id)
+        else:
+            log_line = "failed: {0}".format(log_line)
+            self.error(log_line, errorCode=response_code.value,
+                       errorDescription=response_description, **all_kwargs)
+            metrics_func = _Audit._logger_metrics.error
+            _Audit._health.error(self._metrics_name, timer, self.request_id)
+            _Audit._health.error(METRICS_TOTAL_STATS, timer, self.request_id)
+
+        metrics_func(
+            log_line,
+            begTime=(self._metrics_start_event or _Audit._logger_metrics.getStartRecordEvent()),
+            timer=timer,
+            statusCode=_Audit.get_status_code(success),
+            responseCode=response_code.value,
+            responseDescription=response_description,
+            **all_kwargs)
 
         return (success, max_http_status_code, response_description)
diff --git a/policyhandler/onap/health.py b/policyhandler/onap/health.py
index 485f422..e6a6f69 100644
--- a/policyhandler/onap/health.py
+++ b/policyhandler/onap/health.py
@@ -28,46 +28,95 @@
         """keep track of stats for metrics calls"""
         self._name = name or "stats_" + str(uuid.uuid4())
         self._lock = Lock()
+
         self._call_count = 0
         self._error_count = 0
+        self._active_count = 0
+
         self._longest_timer = 0
         self._total_timer = 0
+
         self._last_success = None
         self._last_error = None
+        self._last_start = None
+        self._longest_end_ts = None
+
+        self._last_success_request_id = None
+        self._last_error_request_id = None
+        self._last_started_request_id = None
+        self._longest_request_id = None
+
 
     def dump(self):
         """returns dict of stats"""
         dump = None
         with self._lock:
             dump = {
-                "call_count" : self._call_count,
-                "error_count" : self._error_count,
-                "last_success" : str(self._last_success),
-                "last_error" : str(self._last_error),
-                "longest_timer_millisecs" : self._longest_timer,
-                "ave_timer_millisecs" : (float(self._total_timer)/self._call_count \
-                                         if self._call_count else 0)
+                "total" : {
+                    "call_count" : self._call_count,
+                    "ave_timer_millisecs" : (float(self._total_timer)/self._call_count
+                                             if self._call_count else 0)
+                },
+                "success" : {
+                    "success_count" : (self._call_count - self._error_count),
+                    "last_success" : str(self._last_success),
+                    "last_success_request_id" : self._last_success_request_id
+                },
+                "error" : {
+                    "error_count" : self._error_count,
+                    "last_error" : str(self._last_error),
+                    "last_error_request_id" : self._last_error_request_id
+                },
+                "active" : {
+                    "active_count" : self._active_count,
+                    "last_start" : str(self._last_start),
+                    "last_started_request_id" : self._last_started_request_id
+                },
+                "longest" : {
+                    "longest_timer_millisecs" : self._longest_timer,
+                    "longest_request_id" : self._longest_request_id,
+                    "longest_end" : str(self._longest_end_ts)
+                }
             }
         return dump
 
-    def success(self, timer):
+
+    def start(self, request_id=None):
+        """records the start of active execution"""
+        with self._lock:
+            self._active_count += 1
+            self._last_start = datetime.utcnow()
+            self._last_started_request_id = request_id
+
+
+    def success(self, timer, request_id=None):
         """records the successful execution"""
         with self._lock:
+            self._active_count -= 1
             self._call_count += 1
-            self._last_success = datetime.now()
+            self._last_success = datetime.utcnow()
+            self._last_success_request_id = request_id
             self._total_timer += timer
             if not self._longest_timer or self._longest_timer < timer:
                 self._longest_timer = timer
+                self._longest_request_id = request_id
+                self._longest_end_ts = self._last_success
 
-    def error(self, timer):
+
+    def error(self, timer, request_id=None):
         """records the errored execution"""
         with self._lock:
+            self._active_count -= 1
             self._call_count += 1
             self._error_count += 1
-            self._last_error = datetime.now()
+            self._last_error = datetime.utcnow()
+            self._last_error_request_id = request_id
             self._total_timer += timer
             if not self._longest_timer or self._longest_timer < timer:
                 self._longest_timer = timer
+                self._longest_request_id = request_id
+                self._longest_end_ts = self._last_error
+
 
 class Health(object):
     """Health stats for multiple requests"""
@@ -76,28 +125,36 @@
         self._all_stats = {}
         self._lock = Lock()
 
+
     def _add_or_get_stats(self, stats_name):
         """add to or get from the ever growing dict of HealthStats"""
-        stats = None
         with self._lock:
             stats = self._all_stats.get(stats_name)
             if not stats:
                 self._all_stats[stats_name] = stats = HealthStats(stats_name)
-        return stats
+            return stats
 
-    def success(self, stats_name, timer):
+
+    def start(self, stats_name, request_id=None):
+        """records the start of execution on stats_name"""
+        stats = self._add_or_get_stats(stats_name)
+        stats.start(request_id)
+
+
+    def success(self, stats_name, timer, request_id=None):
         """records the successful execution on stats_name"""
         stats = self._add_or_get_stats(stats_name)
-        stats.success(timer)
+        stats.success(timer, request_id)
 
-    def error(self, stats_name, timer):
+
+    def error(self, stats_name, timer, request_id=None):
         """records the error execution on stats_name"""
         stats = self._add_or_get_stats(stats_name)
-        stats.error(timer)
+        stats.error(timer, request_id)
+
 
     def dump(self):
         """returns dict of stats"""
         with self._lock:
             stats = dict((k, v.dump()) for (k, v) in self._all_stats.iteritems())
-
         return stats
diff --git a/policyhandler/policy_receiver.py b/policyhandler/policy_receiver.py
index 751bea8..dd9eea6 100644
--- a/policyhandler/policy_receiver.py
+++ b/policyhandler/policy_receiver.py
@@ -35,7 +35,7 @@
 import websocket
 
 from .config import Config
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
 from .policy_updater import PolicyUpdater
 
 LOADED_POLICIES = 'loadedPolicies'
@@ -115,32 +115,41 @@
 
     def _on_pdp_message(self, _, message):
         """received the notification from PDP"""
-        _PolicyReceiver._logger.info("Received notification message: %s", message)
-        if not message:
-            return
-        message = json.loads(message)
+        try:
+            _PolicyReceiver._logger.info("Received notification message: %s", message)
+            if not message:
+                return
+            message = json.loads(message)
 
-        if not message:
-            return
+            if not message or not isinstance(message, dict):
+                _PolicyReceiver._logger.warn("unexpected message from PDP: %s", json.dumps(message))
+                return
 
-        policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
-                            for policy in message.get(LOADED_POLICIES, [])
-                            if self._policy_scopes.match(policy.get(POLICY_NAME))]
-        policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
-                            for policy in message.get(REMOVED_POLICIES, [])
-                            if self._policy_scopes.match(policy.get(POLICY_NAME))]
+            policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+                                for policy in message.get(LOADED_POLICIES, [])
+                                if self._policy_scopes.match(policy.get(POLICY_NAME))]
+            policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
+                                for policy in message.get(REMOVED_POLICIES, [])
+                                if self._policy_scopes.match(policy.get(POLICY_NAME))]
 
-        if not policies_updated and not policies_removed:
-            _PolicyReceiver._logger.info(
-                "no policy updated or removed for scopes %s", self._policy_scopes.pattern
-            )
-            return
+            if not policies_updated and not policies_removed:
+                _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
+                                             self._policy_scopes.pattern)
+                return
 
-        audit = Audit(job_name="policy_update",
-                      req_message="policy-notification - updated[{0}], removed[{1}]"
-                      .format(len(policies_updated), len(policies_removed)))
-        audit.retry_get_config = True
-        self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+            audit = Audit(job_name="policy_update",
+                          req_message="policy-notification - updated[{0}], removed[{1}]"
+                          .format(len(policies_updated), len(policies_removed)),
+                          retry_get_config=True)
+            self._policy_updater.enqueue(audit, policies_updated, policies_removed)
+        except Exception as ex:
+            error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
+                                                       "on_pdp_message", json.dumps(message))
+
+            _PolicyReceiver._logger.exception(error_msg)
+            audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+
 
     def _on_ws_error(self, _, error):
         """report an error"""
@@ -184,12 +193,7 @@
     @staticmethod
     def run(audit):
         """Using policy-engine client to talk to policy engine"""
-        sub_aud = Audit(aud_parent=audit)
-        sub_aud.metrics_start("start policy receiver")
-
         PolicyReceiver._policy_receiver = _PolicyReceiver()
         PolicyReceiver._policy_receiver.start()
 
-        sub_aud.metrics("started policy receiver")
-
         PolicyReceiver.catch_up(audit)
diff --git a/policyhandler/policy_rest.py b/policyhandler/policy_rest.py
index 0c8920a..977a9a1 100644
--- a/policyhandler/policy_rest.py
+++ b/policyhandler/policy_rest.py
@@ -27,11 +27,11 @@
 import requests
 
 from .config import Config
-from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit, AuditHttpCode,
-                         AuditResponseCode)
-from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, POLICY_BODY,
-                            POLICY_CONFIG, POLICY_FILTER, POLICY_ID,
-                            POLICY_NAME, SCOPE_PREFIXES, LATEST_POLICIES)
+from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
+                         AuditResponseCode, Metrics)
+from .policy_consts import (ERRORED_POLICIES, ERRORED_SCOPES, LATEST_POLICIES,
+                            POLICY_BODY, POLICY_CONFIG, POLICY_FILTER,
+                            POLICY_ID, POLICY_NAME, SCOPE_PREFIXES)
 from .policy_utils import PolicyUtils
 
 
@@ -95,43 +95,47 @@
         PolicyRest._policy_retry_count = Config.config.get("policy_retry_count", 1) or 1
         PolicyRest._policy_retry_sleep = Config.config.get("policy_retry_sleep", 0)
 
-        PolicyRest._logger.info("PolicyClient url(%s) headers(%s) scope-prefixes(%s)", \
-            PolicyRest._url_get_config, Audit.log_json_dumps(PolicyRest._headers), \
+        PolicyRest._logger.info(
+            "PolicyClient url(%s) headers(%s) scope-prefixes(%s)",
+            PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers),
             json.dumps(PolicyRest._scope_prefixes))
 
     @staticmethod
     def _pdp_get_config(audit, json_body):
         """Communication with the policy-engine"""
-        sub_aud = Audit(aud_parent=audit, targetEntity=PolicyRest._target_entity, \
-            targetServiceName=PolicyRest._url_get_config)
+        metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
+                          targetServiceName=PolicyRest._url_get_config)
 
         msg = json.dumps(json_body)
         headers = copy.copy(PolicyRest._headers)
-        headers[REQUEST_X_ECOMP_REQUESTID] = sub_aud.request_id
-        headers_str = Audit.log_json_dumps(headers)
+        headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
+        headers_str = Metrics.log_json_dumps(headers)
 
         log_line = "post to PDP {0} msg={1} headers={2}".format(
             PolicyRest._url_get_config, msg, headers_str)
-        sub_aud.metrics_start(log_line)
+        metrics.metrics_start(log_line)
         PolicyRest._logger.info(log_line)
         res = None
         try:
             res = PolicyRest._requests_session.post(
                 PolicyRest._url_get_config, json=json_body, headers=headers)
-        except requests.exceptions.RequestException as ex:
-            error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
-            error_msg = "failed to post to PDP {0} {1} msg={2} headers={3}" \
-                .format(PolicyRest._url_get_config, str(ex), msg, headers_str)
+        except Exception as ex:
+            error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
+                          if isinstance(ex, requests.exceptions.RequestException)
+                          else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            error_msg = (
+                "failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
+                .format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
 
             PolicyRest._logger.exception(error_msg)
-            sub_aud.set_http_status_code(error_code)
+            metrics.set_http_status_code(error_code)
             audit.set_http_status_code(error_code)
-            sub_aud.metrics(error_msg)
+            metrics.metrics(error_msg)
             return (error_code, None)
 
         log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
             PolicyRest._url_get_config, res.status_code, msg, res.text,
-            Audit.log_json_dumps(dict(res.request.headers.items())))
+            Metrics.log_json_dumps(dict(res.request.headers.items())))
 
         res_data = None
         if res.status_code == requests.codes.ok:
@@ -146,9 +150,9 @@
                     error_msg = "unexpected {0}".format(log_line)
 
                     PolicyRest._logger.error(error_msg)
-                    sub_aud.set_http_status_code(error_code)
+                    metrics.set_http_status_code(error_code)
                     audit.set_http_status_code(error_code)
-                    sub_aud.metrics(error_msg)
+                    metrics.metrics(error_msg)
                     return (error_code, None)
 
         elif res.status_code == PolicyRest.POLICY_ENGINE_STATUS_CODE_ERROR:
@@ -163,14 +167,14 @@
                         info_msg = "not found {0}".format(log_line)
 
                         PolicyRest._logger.info(info_msg)
-                        sub_aud.set_http_status_code(status_code)
-                        sub_aud.metrics(info_msg)
+                        metrics.set_http_status_code(status_code)
+                        metrics.metrics(info_msg)
                         return (status_code, None)
             except ValueError:
                 pass
 
-        sub_aud.set_http_status_code(res.status_code)
-        sub_aud.metrics(log_line)
+        metrics.set_http_status_code(res.status_code)
+        metrics.metrics(log_line)
         PolicyRest._logger.info(log_line)
         return res.status_code, res_data
 
@@ -191,165 +195,187 @@
     @staticmethod
     def get_latest_policy(aud_policy_id):
         """Get the latest policy for the policy_id from the policy-engine"""
-        PolicyRest._lazy_init()
         audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
+        str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
+            policy_id, min_version_expected, json.dumps(ignore_policy_names))
 
-        status_code = 0
-        policy_configs = None
-        latest_policy = None
-        expect_policy_removed = (ignore_policy_names and not min_version_expected)
+        try:
+            PolicyRest._lazy_init()
+            status_code = 0
+            retry_get_config = audit.kwargs.get("retry_get_config")
+            policy_configs = None
+            latest_policy = None
+            expect_policy_removed = (ignore_policy_names and not min_version_expected)
 
-        for retry in xrange(1, PolicyRest._policy_retry_count + 1):
-            PolicyRest._logger.debug("%s", policy_id)
+            for retry in xrange(1, PolicyRest._policy_retry_count + 1):
+                PolicyRest._logger.debug(str_metrics)
 
-            status_code, policy_configs = PolicyRest._pdp_get_config(
-                audit, {POLICY_NAME:policy_id}
-            )
+                status_code, policy_configs = PolicyRest._pdp_get_config(
+                    audit, {POLICY_NAME:policy_id}
+                )
 
-            PolicyRest._logger.debug("%s %s policy_configs: %s",
-                                     status_code, policy_id, json.dumps(policy_configs or []))
+                PolicyRest._logger.debug("%s %s policy_configs: %s",
+                                         status_code, policy_id, json.dumps(policy_configs or []))
 
-            latest_policy = PolicyUtils.select_latest_policy(
-                policy_configs, min_version_expected, ignore_policy_names
-            )
+                latest_policy = PolicyUtils.select_latest_policy(
+                    policy_configs, min_version_expected, ignore_policy_names
+                )
 
-            if not latest_policy and not expect_policy_removed:
-                audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
-                            .format(policy_id, json.dumps(policy_configs or [])),
-                            errorCode=AuditResponseCode.DATA_ERROR.value,
-                            errorDescription=AuditResponseCode.get_human_text(
-                                AuditResponseCode.DATA_ERROR))
+                if not latest_policy and not expect_policy_removed:
+                    audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
+                                .format(policy_id, json.dumps(policy_configs or [])),
+                                error_code=AuditResponseCode.DATA_ERROR)
 
-            if latest_policy or not audit.retry_get_config \
-            or (expect_policy_removed and not policy_configs) \
-            or not PolicyRest._policy_retry_sleep \
-            or audit.is_serious_error(status_code):
-                break
+                if (latest_policy
+                        or not retry_get_config
+                        or (expect_policy_removed and not policy_configs)
+                        or not PolicyRest._policy_retry_sleep
+                        or audit.is_serious_error(status_code)):
+                    break
 
-            if retry == PolicyRest._policy_retry_count:
-                audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
-                           .format(PolicyRest._url_get_config, retry, policy_id),
-                           errorCode=AuditResponseCode.DATA_ERROR.value,
-                           errorDescription=AuditResponseCode.get_human_text(
-                               AuditResponseCode.DATA_ERROR))
-                break
+                if retry == PolicyRest._policy_retry_count:
+                    audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
+                               .format(PolicyRest._url_get_config, retry, policy_id),
+                               error_code=AuditResponseCode.DATA_ERROR)
+                    break
 
-            audit.warn(
-                "retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
-                    retry, PolicyRest._url_get_config, PolicyRest._policy_retry_sleep, policy_id),
-                errorCode=AuditResponseCode.DATA_ERROR.value,
-                errorDescription=AuditResponseCode.get_human_text(
-                    AuditResponseCode.DATA_ERROR))
-            time.sleep(PolicyRest._policy_retry_sleep)
+                audit.warn("retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
+                    retry, PolicyRest._url_get_config,
+                    PolicyRest._policy_retry_sleep, policy_id),
+                           error_code=AuditResponseCode.DATA_ERROR)
+                time.sleep(PolicyRest._policy_retry_sleep)
 
-        if expect_policy_removed and not latest_policy \
-        and AuditHttpCode.RESPONSE_ERROR.value == status_code:
-            audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+            if (expect_policy_removed and not latest_policy
+                    and AuditHttpCode.RESPONSE_ERROR.value == status_code):
+                audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
+                return None
+
+            audit.set_http_status_code(status_code)
+            if not PolicyRest._validate_policy(latest_policy):
+                audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+                audit.error(
+                    "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
+                    error_code=AuditResponseCode.DATA_ERROR)
+
+            return latest_policy
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+                         .format(audit.request_id, type(ex).__name__, str(ex),
+                                 "get_latest_policy", str_metrics))
+
+            PolicyRest._logger.exception(error_msg)
+            audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
             return None
 
-        audit.set_http_status_code(status_code)
-        if not PolicyRest._validate_policy(latest_policy):
-            audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
-            audit.error(
-                "received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
-                errorCode=AuditResponseCode.DATA_ERROR.value,
-                errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
-            )
-
-        return latest_policy
 
     @staticmethod
     def get_latest_updated_policies(aud_policy_updates):
         """Get the latest policies of the list of policy_names from the policy-engine"""
-        PolicyRest._lazy_init()
         audit, policies_updated, policies_removed = aud_policy_updates
         if not policies_updated and not policies_removed:
-            return
+            return None, None
 
         str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
             len(policies_updated), json.dumps(policies_updated),
             len(policies_removed), json.dumps(policies_removed))
-        audit.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
-        PolicyRest._logger.debug(str_metrics)
 
-        policies_to_find = {}
-        for (policy_name, policy_version) in policies_updated:
-            policy_id = PolicyUtils.extract_policy_id(policy_name)
-            if not policy_id or not policy_version.isdigit():
-                continue
-            policy = policies_to_find.get(policy_id)
-            if not policy:
-                policies_to_find[policy_id] = {
-                    POLICY_ID: policy_id,
-                    PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
-                    PolicyRest.IGNORE_POLICY_NAMES: {}
-                }
-                continue
-            if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
-                policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
+        target_entity = "{0} total get_latest_updated_policies".format(PolicyRest._target_entity)
+        try:
+            PolicyRest._lazy_init()
+            metrics = Metrics(aud_parent=audit,
+                              targetEntity=target_entity,
+                              targetServiceName=PolicyRest._url_get_config)
 
-        for (policy_name, _) in policies_removed:
-            policy_id = PolicyUtils.extract_policy_id(policy_name)
-            if not policy_id:
-                continue
-            policy = policies_to_find.get(policy_id)
-            if not policy:
-                policies_to_find[policy_id] = {
-                    POLICY_ID: policy_id,
-                    PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
-                }
-                continue
-            policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
+            metrics.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
+            PolicyRest._logger.debug(str_metrics)
 
-        apns = [(audit, policy_id,
-                 policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
-                 policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
-                for (policy_id, policy_to_find) in policies_to_find.iteritems()]
+            policies_to_find = {}
+            for (policy_name, policy_version) in policies_updated:
+                policy_id = PolicyUtils.extract_policy_id(policy_name)
+                if not policy_id or not policy_version.isdigit():
+                    continue
+                policy = policies_to_find.get(policy_id)
+                if not policy:
+                    policies_to_find[policy_id] = {
+                        POLICY_ID: policy_id,
+                        PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
+                        PolicyRest.IGNORE_POLICY_NAMES: {}
+                    }
+                    continue
+                if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
+                    policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
 
-        policies = None
-        apns_length = len(apns)
-        if apns_length == 1:
-            policies = [PolicyRest.get_latest_policy(apns[0])]
-        else:
-            pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
-            policies = pool.map(PolicyRest.get_latest_policy, apns)
-            pool.close()
-            pool.join()
+            for (policy_name, _) in policies_removed:
+                policy_id = PolicyUtils.extract_policy_id(policy_name)
+                if not policy_id:
+                    continue
+                policy = policies_to_find.get(policy_id)
+                if not policy:
+                    policies_to_find[policy_id] = {
+                        POLICY_ID: policy_id,
+                        PolicyRest.IGNORE_POLICY_NAMES: {policy_name:True}
+                    }
+                    continue
+                policy[PolicyRest.IGNORE_POLICY_NAMES][policy_name] = True
 
-        audit.metrics("result get_latest_updated_policies {0}: {1} {2}"
-                      .format(str_metrics, len(policies), json.dumps(policies)),
-                      targetEntity=PolicyRest._target_entity,
-                      targetServiceName=PolicyRest._url_get_config)
+            apns = [(audit, policy_id,
+                     policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
+                     policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
+                    for (policy_id, policy_to_find) in policies_to_find.iteritems()]
 
-        updated_policies = dict((policy[POLICY_ID], policy)
-                                for policy in policies
-                                if policy and policy.get(POLICY_ID))
+            policies = None
+            apns_length = len(apns)
+            if apns_length == 1:
+                policies = [PolicyRest.get_latest_policy(apns[0])]
+            else:
+                pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
+                policies = pool.map(PolicyRest.get_latest_policy, apns)
+                pool.close()
+                pool.join()
 
-        removed_policies = dict((policy_id, True)
-                                for (policy_id, policy_to_find) in policies_to_find.iteritems()
-                                if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
-                                and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
-                                and policy_id not in updated_policies)
+            metrics.metrics("result get_latest_updated_policies {0}: {1} {2}"
+                            .format(str_metrics, len(policies), json.dumps(policies)))
 
-        errored_policies = dict((policy_id, policy_to_find)
-                                for (policy_id, policy_to_find) in policies_to_find.iteritems()
-                                if policy_id not in updated_policies
-                                and policy_id not in removed_policies)
+            updated_policies = dict((policy[POLICY_ID], policy)
+                                    for policy in policies
+                                    if policy and policy.get(POLICY_ID))
 
-        PolicyRest._logger.debug(
-            "result updated_policies %s, removed_policies %s, errored_policies %s",
-            json.dumps(updated_policies), json.dumps(removed_policies),
-            json.dumps(errored_policies))
+            removed_policies = dict((policy_id, True)
+                                    for (policy_id, policy_to_find) in policies_to_find.iteritems()
+                                    if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
+                                    and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
+                                    and policy_id not in updated_policies)
 
-        if errored_policies:
-            audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
-            audit.error(
-                "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
-                errorCode=AuditResponseCode.DATA_ERROR.value,
-                errorDescription=AuditResponseCode.get_human_text(AuditResponseCode.DATA_ERROR)
-            )
+            errored_policies = dict((policy_id, policy_to_find)
+                                    for (policy_id, policy_to_find) in policies_to_find.iteritems()
+                                    if policy_id not in updated_policies
+                                    and policy_id not in removed_policies)
 
-        return updated_policies, removed_policies
+            PolicyRest._logger.debug(
+                "result updated_policies %s, removed_policies %s, errored_policies %s",
+                json.dumps(updated_policies), json.dumps(removed_policies),
+                json.dumps(errored_policies))
+
+            if errored_policies:
+                audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+                audit.error(
+                    "errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
+                    error_code=AuditResponseCode.DATA_ERROR)
+
+            return updated_policies, removed_policies
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+                         .format(audit.request_id, type(ex).__name__, str(ex),
+                                 "get_latest_updated_policies", str_metrics))
+
+            PolicyRest._logger.exception(error_msg)
+            audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            return None, None
+
 
     @staticmethod
     def _get_latest_policies(aud_policy_filter):
@@ -358,104 +384,120 @@
         or all the latest policies of the same scope from the policy-engine
         """
         audit, policy_filter, scope_prefix = aud_policy_filter
-        str_policy_filter = json.dumps(policy_filter)
-        PolicyRest._logger.debug("%s", str_policy_filter)
+        try:
+            str_policy_filter = json.dumps(policy_filter)
+            PolicyRest._logger.debug("%s", str_policy_filter)
 
-        status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
+            status_code, policy_configs = PolicyRest._pdp_get_config(audit, policy_filter)
 
-        PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
-                                 str_policy_filter, json.dumps(policy_configs or []))
+            PolicyRest._logger.debug("%s policy_configs: %s %s", status_code,
+                                     str_policy_filter, json.dumps(policy_configs or []))
 
-        latest_policies = PolicyUtils.select_latest_policies(policy_configs)
+            latest_policies = PolicyUtils.select_latest_policies(policy_configs)
 
-        if scope_prefix and not policy_configs \
-        and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value:
-            audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
-                       errorCode=AuditResponseCode.DATA_ERROR.value,
-                       errorDescription=AuditResponseCode.get_human_text(
-                           AuditResponseCode.DATA_ERROR)
-                      )
-            return None, latest_policies, scope_prefix
+            if (scope_prefix and not policy_configs
+                    and status_code != AuditHttpCode.DATA_NOT_FOUND_ERROR.value):
+                audit.warn("PDP error {0} on scope_prefix {1}".format(status_code, scope_prefix),
+                           error_code=AuditResponseCode.DATA_ERROR)
+                return None, latest_policies, scope_prefix
 
-        if not latest_policies:
-            if not scope_prefix:
-                audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
-                audit.warn(
-                    "received no policies from PDP for policy_filter {0}: {1}"
-                    .format(str_policy_filter, json.dumps(policy_configs or [])),
-                    errorCode=AuditResponseCode.DATA_ERROR.value,
-                    errorDescription=AuditResponseCode.get_human_text(
-                        AuditResponseCode.DATA_ERROR)
-                )
-            return None, latest_policies, None
+            if not latest_policies:
+                if not scope_prefix:
+                    audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
+                    audit.warn(
+                        "received no policies from PDP for policy_filter {0}: {1}"
+                        .format(str_policy_filter, json.dumps(policy_configs or [])),
+                        error_code=AuditResponseCode.DATA_ERROR)
+                return None, latest_policies, None
 
-        audit.set_http_status_code(status_code)
-        valid_policies = {}
-        errored_policies = {}
-        for (policy_id, policy) in latest_policies.iteritems():
-            if PolicyRest._validate_policy(policy):
-                valid_policies[policy_id] = policy
-            else:
-                errored_policies[policy_id] = policy
-        return valid_policies, errored_policies, None
+            audit.set_http_status_code(status_code)
+            valid_policies = {}
+            errored_policies = {}
+            for (policy_id, policy) in latest_policies.iteritems():
+                if PolicyRest._validate_policy(policy):
+                    valid_policies[policy_id] = policy
+                else:
+                    errored_policies[policy_id] = policy
+            return valid_policies, errored_policies, None
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4}), scope_prefix({5})"
+                         .format(audit.request_id, type(ex).__name__, str(ex),
+                                 "_get_latest_policies", json.dumps(policy_filter), scope_prefix))
+
+            PolicyRest._logger.exception(error_msg)
+            audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            return None, None, scope_prefix
+
 
     @staticmethod
     def get_latest_policies(audit, policy_filter=None):
         """Get the latest policies of the same scope from the policy-engine"""
-        PolicyRest._lazy_init()
-
         result = {}
         aud_policy_filters = None
         str_policy_filters = None
         str_metrics = None
         target_entity = None
 
-        if policy_filter is not None:
-            aud_policy_filters = [(audit, policy_filter, None)]
-            str_policy_filters = json.dumps(policy_filter)
-            str_metrics = "get_latest_policies for policy_filter {0}".format(
-                str_policy_filters)
-            target_entity = ("{0} total get_latest_policies by policy_filter"
-                             .format(PolicyRest._target_entity))
-            result[POLICY_FILTER] = copy.deepcopy(policy_filter)
-        else:
-            aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
-                                  for scope_prefix in PolicyRest._scope_prefixes]
-            str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
-            str_metrics = "get_latest_policies for scopes {0} {1}".format( \
-                len(PolicyRest._scope_prefixes), str_policy_filters)
-            target_entity = ("{0} total get_latest_policies by scope_prefixes"
-                             .format(PolicyRest._target_entity))
-            result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
+        try:
+            PolicyRest._lazy_init()
+            if policy_filter is not None:
+                aud_policy_filters = [(audit, policy_filter, None)]
+                str_policy_filters = json.dumps(policy_filter)
+                str_metrics = "get_latest_policies for policy_filter {0}".format(
+                    str_policy_filters)
+                target_entity = ("{0} total get_latest_policies by policy_filter"
+                                 .format(PolicyRest._target_entity))
+                result[POLICY_FILTER] = copy.deepcopy(policy_filter)
+            else:
+                aud_policy_filters = [(audit, {POLICY_NAME:scope_prefix + ".*"}, scope_prefix)
+                                      for scope_prefix in PolicyRest._scope_prefixes]
+                str_policy_filters = json.dumps(PolicyRest._scope_prefixes)
+                str_metrics = "get_latest_policies for scopes {0} {1}".format( \
+                    len(PolicyRest._scope_prefixes), str_policy_filters)
+                target_entity = ("{0} total get_latest_policies by scope_prefixes"
+                                 .format(PolicyRest._target_entity))
+                result[SCOPE_PREFIXES] = copy.deepcopy(PolicyRest._scope_prefixes)
 
-        PolicyRest._logger.debug("%s", str_policy_filters)
-        audit.metrics_start(str_metrics)
+            PolicyRest._logger.debug("%s", str_policy_filters)
+            metrics = Metrics(aud_parent=audit, targetEntity=target_entity,
+                              targetServiceName=PolicyRest._url_get_config)
 
-        latest_policies = None
-        apfs_length = len(aud_policy_filters)
-        if apfs_length == 1:
-            latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
-        else:
-            pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
-            latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
-            pool.close()
-            pool.join()
+            metrics.metrics_start(str_metrics)
 
-        audit.metrics(
-            "total result {0}: {1} {2}".format(
-                str_metrics, len(latest_policies), json.dumps(latest_policies)),
-            targetEntity=target_entity, targetServiceName=PolicyRest._url_get_config)
+            latest_policies = None
+            apfs_length = len(aud_policy_filters)
+            if apfs_length == 1:
+                latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
+            else:
+                pool = ThreadPool(min(PolicyRest._scope_thread_pool_size, apfs_length))
+                latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
+                pool.close()
+                pool.join()
 
-        # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
-        result[LATEST_POLICIES] = dict(
-            pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
+            metrics.metrics("total result {0}: {1} {2}".format(
+                str_metrics, len(latest_policies), json.dumps(latest_policies)))
 
-        result[ERRORED_POLICIES] = dict(
-            pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
+            # latest_policies == [(valid_policies, errored_policies, errored_scope_prefix), ...]
+            result[LATEST_POLICIES] = dict(
+                pair for (vps, _, _) in latest_policies if vps for pair in vps.iteritems())
 
-        result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
+            result[ERRORED_POLICIES] = dict(
+                pair for (_, eps, _) in latest_policies if eps for pair in eps.iteritems())
 
-        PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
-                                 str_policy_filters, json.dumps(result))
+            result[ERRORED_SCOPES] = sorted([esp for (_, _, esp) in latest_policies if esp])
 
-        return result
+            PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
+                                     str_policy_filters, json.dumps(result))
+            return result
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+                         .format(audit.request_id, type(ex).__name__, str(ex),
+                                 "get_latest_policies", str_metrics))
+
+            PolicyRest._logger.exception(error_msg)
+            audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            return None
diff --git a/policyhandler/policy_updater.py b/policyhandler/policy_updater.py
index 70823fa..38ce93a 100644
--- a/policyhandler/policy_updater.py
+++ b/policyhandler/policy_updater.py
@@ -26,7 +26,7 @@
 
 from .config import Config
 from .deploy_handler import DeployHandler
-from .onap.audit import Audit
+from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
 from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, LATEST_POLICIES,
                             REMOVED_POLICIES)
 from .policy_rest import PolicyRest
@@ -56,6 +56,7 @@
         self._lock = Lock()
         self._queue = Queue()
 
+
     def enqueue(self, audit=None, policies_updated=None, policies_removed=None):
         """enqueue the policy-updates"""
         policies_updated = policies_updated or []
@@ -65,7 +66,23 @@
             "enqueue request_id %s policies_updated %s policies_removed %s",
             ((audit and audit.request_id) or "none"),
             json.dumps(policies_updated), json.dumps(policies_removed))
-        self._queue.put((audit, policies_updated, policies_removed))
+
+        with self._lock:
+            self._queue.put((audit, policies_updated, policies_removed))
+
+
+    def catch_up(self, audit=None):
+        """need to bring the latest policies to DCAE-Controller"""
+        with self._lock:
+            if not self._aud_catch_up:
+                self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
+                PolicyUpdater._logger.info(
+                    "catch_up %s request_id %s",
+                    self._aud_catch_up.req_message, self._aud_catch_up.request_id
+                )
+
+        self.enqueue()
+
 
     def run(self):
         """wait and run the policy-update in thread"""
@@ -78,32 +95,15 @@
                 json.dumps(policies_updated), json.dumps(policies_removed))
 
             if not self._keep_running():
-                self._queue.task_done()
                 break
 
             if self._on_catch_up():
                 self._reset_queue()
                 continue
             elif not queued_audit:
-                self._queue.task_done()
                 continue
 
-            updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
-                (queued_audit, policies_updated, policies_removed))
-
-            message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
-
-            deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
-
-            self._queue.task_done()
-            queued_audit.audit_done()
-
-            if deployment_handler_changed:
-                self._catch_up_prev_message = None
-                self._pause_catch_up_timer()
-                self.catch_up()
-            elif not queued_audit.is_success():
-                self._catch_up_prev_message = None
+            self._on_policies_update(queued_audit, policies_updated, policies_removed)
 
         PolicyUpdater._logger.info("exit policy-updater")
 
@@ -116,17 +116,6 @@
             self._aud_shutdown.audit_done()
         return keep_running
 
-    def catch_up(self, audit=None):
-        """need to bring the latest policies to DCAE-Controller"""
-        with self._lock:
-            self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
-            PolicyUpdater._logger.info(
-                "catch_up %s request_id %s",
-                self._aud_catch_up.req_message, self._aud_catch_up.request_id
-            )
-
-        self.enqueue()
-
     def _run_catch_up_timer(self):
         """create and start the catch_up timer"""
         if not self._catch_up_interval:
@@ -190,9 +179,9 @@
     def _reset_queue(self):
         """clear up the queue"""
         with self._lock:
-            self._aud_catch_up = None
-            self._queue.task_done()
-            self._queue = Queue()
+            if not self._aud_catch_up and not self._aud_shutdown:
+                with self._queue.mutex:
+                    self._queue.queue.clear()
 
     def _on_catch_up(self):
         """bring all the latest policies to DCAE-Controller"""
@@ -207,37 +196,94 @@
         log_line = "catch_up {0} request_id {1}".format(
             aud_catch_up.req_message, aud_catch_up.request_id
         )
+        try:
+            PolicyUpdater._logger.info(log_line)
+            self._pause_catch_up_timer()
 
-        PolicyUpdater._logger.info(log_line)
-        self._pause_catch_up_timer()
+            catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
+            catch_up_message[CATCH_UP] = True
 
-        catch_up_message = PolicyRest.get_latest_policies(aud_catch_up)
-        catch_up_message[CATCH_UP] = True
-
-        catch_up_result = ""
-        if not aud_catch_up.is_success():
-            catch_up_result = "- not sending catch-up to deployment-handler due to errors"
-            PolicyUpdater._logger.warn(catch_up_result)
-        elif self._need_to_send_catch_up(aud_catch_up, catch_up_message):
-            DeployHandler.policy_update(aud_catch_up, catch_up_message)
-            if aud_catch_up.is_success():
-                catch_up_result = "- sent catch-up to deployment-handler"
-            else:
-                catch_up_result = "- failed to send catch-up to deployment-handler"
+            catch_up_result = ""
+            if not aud_catch_up.is_success():
+                catch_up_result = "- not sending catch-up to deployment-handler due to errors"
                 PolicyUpdater._logger.warn(catch_up_result)
-        else:
-            catch_up_result = "- skipped sending the same policies"
-        success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
-        PolicyUpdater._logger.info(log_line + " " + catch_up_result)
-        PolicyUpdater._logger.info("policy_handler health: %s", json.dumps(Audit.health()))
+            elif not self._need_to_send_catch_up(aud_catch_up, catch_up_message):
+                catch_up_result = "- skipped sending the same policies"
+            else:
+                DeployHandler.policy_update(aud_catch_up, catch_up_message, rediscover=True)
+                if not aud_catch_up.is_success():
+                    catch_up_result = "- failed to send catch-up to deployment-handler"
+                    PolicyUpdater._logger.warn(catch_up_result)
+                else:
+                    catch_up_result = "- sent catch-up to deployment-handler"
+            success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
+            PolicyUpdater._logger.info(log_line + " " + catch_up_result)
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+                         .format(aud_catch_up.request_id, type(ex).__name__, str(ex),
+                                 "on_catch_up", log_line + " " + catch_up_result))
+
+            PolicyUpdater._logger.exception(error_msg)
+            aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            success = False
 
         if not success:
             self._catch_up_prev_message = None
 
         self._run_catch_up_timer()
 
+        PolicyUpdater._logger.info("policy_handler health: %s",
+                                   json.dumps(aud_catch_up.health(full=True)))
         return success
 
+
+    def _on_policies_update(self, queued_audit, policies_updated, policies_removed):
+        """handle the event of policy-updates from the queue"""
+        deployment_handler_changed = None
+        result = ""
+
+        log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
+            ((queued_audit and queued_audit.request_id) or "none"),
+            json.dumps(policies_updated), json.dumps(policies_removed))
+
+        try:
+            updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
+                (queued_audit, policies_updated, policies_removed))
+
+            if not queued_audit.is_success():
+                result = "- not sending policy-updates to deployment-handler due to errors"
+                PolicyUpdater._logger.warn(result)
+            else:
+                message = {LATEST_POLICIES: updated_policies, REMOVED_POLICIES: removed_policies}
+                deployment_handler_changed = DeployHandler.policy_update(queued_audit, message)
+                if not queued_audit.is_success():
+                    result = "- failed to send policy-updates to deployment-handler"
+                    PolicyUpdater._logger.warn(result)
+                else:
+                    result = "- sent policy-updates to deployment-handler"
+
+            success, _, _ = queued_audit.audit_done(result=result)
+
+        except Exception as ex:
+            error_msg = ("{0}: crash {1} {2} at {3}: {4}"
+                         .format(queued_audit.request_id, type(ex).__name__, str(ex),
+                                 "on_policies_update", log_line + " " + result))
+
+            PolicyUpdater._logger.exception(error_msg)
+            queued_audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
+            queued_audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
+            success = False
+
+        if deployment_handler_changed:
+            self._catch_up_prev_message = None
+            self._pause_catch_up_timer()
+            self.catch_up()
+        elif not success:
+            self._catch_up_prev_message = None
+
+
     def shutdown(self, audit):
         """Shutdown the policy-updater"""
         PolicyUpdater._logger.info("shutdown policy-updater")
diff --git a/policyhandler/step_timer.py b/policyhandler/step_timer.py
index 3936107..2a13dd5 100644
--- a/policyhandler/step_timer.py
+++ b/policyhandler/step_timer.py
@@ -18,6 +18,7 @@
 
 """periodically callback"""
 
+import json
 from datetime import datetime
 from threading import Event, RLock, Thread
 
@@ -51,11 +52,11 @@
         self._request = StepTimer.INIT
         self._req_count = 0
         self._req_time = 0
-        self._req_ts = datetime.now()
+        self._req_ts = datetime.utcnow()
 
         self._substep = None
         self._substep_time = 0
-        self._substep_ts = datetime.now()
+        self._substep_ts = datetime.utcnow()
 
     def get_timer_status(self):
         """returns timer status"""
@@ -104,9 +105,9 @@
 
             prev_req = self._request
             self._request = request
-            now = datetime.now()
-            self._req_time = (now - self._req_ts).total_seconds()
-            self._req_ts = now
+            utcnow = datetime.utcnow()
+            self._req_time = (utcnow - self._req_ts).total_seconds()
+            self._req_ts = utcnow
             self._logger.info("{0}[{1}] {2}->{3}".format(
                 self.name, self._req_time, prev_req, self.get_timer_status()))
 
@@ -114,9 +115,9 @@
         """log exe step"""
         with self._lock:
             self._substep = substep
-            now = datetime.now()
-            self._substep_time = (now - self._substep_ts).total_seconds()
-            self._substep_ts = now
+            utcnow = datetime.utcnow()
+            self._substep_time = (utcnow - self._substep_ts).total_seconds()
+            self._substep_ts = utcnow
             self._logger.info("[{0}] {1}".format(self._substep_time, self.get_timer_status()))
 
     def run(self):
@@ -147,8 +148,14 @@
             if self._paused:
                 self._timer_substep("paused - skip on_time event")
             else:
-                self._timer_substep("on_time event")
-                self._on_time(*self._args, **self._kwargs)
+                try:
+                    self._timer_substep("on_time event")
+                    self._on_time(*self._args, **self._kwargs)
+                except Exception as ex:
+                    error_msg = ("{0}: crash {1} {2} at {3}: args({4}), kwargs({5})"
+                                 .format(self.name, type(ex).__name__, str(ex), "_on_time",
+                                         json.dumps(self._args), json.dumps(self._kwargs)))
+                    self._logger.exception(error_msg)
 
             self._timer_substep("waiting for next...")
             self._next.wait()
diff --git a/policyhandler/web_server.py b/policyhandler/web_server.py
index e9cc9cc..5314791 100644
--- a/policyhandler/web_server.py
+++ b/policyhandler/web_server.py
@@ -169,7 +169,7 @@
     @cherrypy.tools.json_out()
     def catch_up(self):
         """catch up with all DCAE policies"""
-        started = str(datetime.now())
+        started = str(datetime.utcnow())
         req_info = _PolicyWeb._get_request_info(cherrypy.request)
         audit = Audit(job_name="catch_up", req_message=req_info, headers=cherrypy.request.headers)
 
@@ -193,11 +193,10 @@
 
         PolicyReceiver.shutdown(audit)
 
-        health = json.dumps(Audit.health())
-        audit.info("policy_handler health: {0}".format(health))
-        PolicyWeb.logger.info("policy_handler health: %s", health)
+        PolicyWeb.logger.info("policy_handler health: {0}"
+                              .format(json.dumps(audit.health(full=True))))
         PolicyWeb.logger.info("%s: --------- the end -----------", req_info)
-        res = str(datetime.now())
+        res = str(datetime.utcnow())
         audit.info_requested(res)
         return "goodbye! shutdown requested {0}".format(res)
 
@@ -211,7 +210,7 @@
 
         PolicyWeb.logger.info("%s", req_info)
 
-        res = Audit.health()
+        res = audit.health()
 
         PolicyWeb.logger.info("healthcheck %s: res=%s", req_info, json.dumps(res))