Tommy Carpenter | 81b9ed7 | 2017-08-23 11:21:44 -0400 | [diff] [blame] | 1 | # org.onap.dcae |
| 2 | # ================================================================================ |
| 3 | # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved. |
| 4 | # ================================================================================ |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | # ============LICENSE_END========================================================= |
| 17 | # |
| 18 | # ECOMP is a trademark and service mark of AT&T Intellectual Property. |
| 19 | |
| 20 | import time, json, os, re, logging |
| 21 | from itertools import chain |
| 22 | from functools import partial |
| 23 | import requests |
| 24 | import consul |
| 25 | import six |
| 26 | from discovery_client import util |
| 27 | |
| 28 | |
| 29 | _logger = util.get_logger(__name__) |
| 30 | |
| 31 | class DiscoveryInitError(RuntimeError): |
| 32 | pass |
| 33 | |
| 34 | class DiscoveryRegistrationError(RuntimeError): |
| 35 | pass |
| 36 | |
| 37 | class DiscoveryResolvingNameError(RuntimeError): |
| 38 | pass |
| 39 | |
| 40 | |
| 41 | ##### |
| 42 | # Consul calls for services |
| 43 | ##### |
| 44 | |
| 45 | def _get_configuration_from_consul(consul_handle, service_name): |
| 46 | index = None |
| 47 | while True: |
| 48 | index, data = consul_handle.kv.get(service_name, index=index) |
| 49 | |
| 50 | if data: |
| 51 | return json.loads(data["Value"].decode("utf-8")) |
| 52 | else: |
| 53 | _logger.warn("No configuration found for {0}. Try again in a bit." |
| 54 | .format(service_name)) |
| 55 | time.sleep(5) |
| 56 | |
| 57 | def _get_relationships_from_consul(consul_handle, service_name): |
| 58 | """Fetch the relationship information from Consul for a service by service |
| 59 | name. Returns a list of service names.""" |
| 60 | index = None |
| 61 | rel_key = "{0}:rel".format(service_name) |
| 62 | while True: |
| 63 | index, data = consul_handle.kv.get(rel_key, index=index) |
| 64 | |
| 65 | if data: |
| 66 | return json.loads(data["Value"].decode("utf-8")) |
| 67 | else: |
| 68 | _logger.warn("No relationships found for {0}. Try again in a bit." |
| 69 | .format(service_name)) |
| 70 | time.sleep(5) |
| 71 | |
| 72 | def _lookup_with_consul(consul_handle, service_name, max_attempts=0): |
| 73 | num_attempts = 1 |
| 74 | |
| 75 | while True: |
| 76 | index, results = consul_handle.catalog.service(service_name) |
| 77 | |
| 78 | if results: |
| 79 | return results |
| 80 | else: |
| 81 | num_attempts += 1 |
| 82 | |
| 83 | if max_attempts > 0 and max_attempts < num_attempts: |
| 84 | return None |
| 85 | |
| 86 | _logger.warn("Service not found {0}. Trying again in a bit." |
| 87 | .format(service_name)) |
| 88 | time.sleep(5) |
| 89 | |
| 90 | def _register_with_consul(consul_handle, service_name, service_ip, service_port, |
| 91 | health_endpoint): |
| 92 | # https://www.consul.io/docs/agent/http/agent.html#agent_service_register |
| 93 | # Note: Unhealthy services should not return in queries i.e. |
| 94 | # dig @127.0.0.1 -p 8600 foo.service.consul |
| 95 | health_url = "http://{0}:{1}/{2}".format(service_ip, service_port, health_endpoint) |
| 96 | return consul_handle.agent.service.register(service_name, address=service_ip, |
| 97 | port=service_port, check= { "HTTP": health_url, "Interval": "5s" }) |
| 98 | |
| 99 | ##### |
| 100 | # Config binding service call |
| 101 | ##### |
| 102 | |
| 103 | def _get_configuration_resolved_from_cbs(consul_handle, service_name): |
| 104 | """ |
| 105 | This is what a minimal python client library that wraps the CBS would look like. |
| 106 | POSSIBLE TODO: break this out into pypi repo |
| 107 | |
| 108 | This call does not raise an exception if Consul or the CBS cannot complete the request. |
| 109 | It logs an error and returns {} if the config is not bindable. |
| 110 | It could be a temporary network outage. Call me again later. |
| 111 | |
| 112 | It will raise an exception if the necessary env parameters were not set because that is irrecoverable. |
| 113 | This function is called in my /heatlhcheck, so this will be caught early. |
| 114 | """ |
| 115 | config = {} |
| 116 | |
| 117 | results = _lookup_with_consul(consul_handle, "config_binding_service", |
| 118 | max_attempts=5) |
| 119 | |
| 120 | if results is None: |
| 121 | logger.error("Cannot bind config at this time, cbs is unreachable") |
| 122 | else: |
| 123 | cbs_hostname = results[0]["ServiceAddress"] |
| 124 | cbs_port = results[0]["ServicePort"] |
| 125 | cbs_url = "http://{hostname}:{port}".format(hostname=cbs_hostname, port=cbs_port) |
| 126 | |
| 127 | #get my config |
| 128 | my_config_endpoint = "{0}/service_component/{1}".format(cbs_url, |
| 129 | service_name) |
| 130 | res = requests.get(my_config_endpoint) |
| 131 | try: |
| 132 | res.raise_for_status() |
| 133 | config = res.json() |
| 134 | _logger.info("get_config returned the following configuration: {0}".format(json.dumps(config))) |
| 135 | except: |
| 136 | _logger.error("in get_config, the config binding service endpoint {0} blew up on me. Error code: {1}, Error text: {2}".format(my_config_endpoint, res.status_code, res.text)) |
| 137 | return config |
| 138 | |
| 139 | ##### |
| 140 | # Functionality for putting together service's configuration |
| 141 | ##### |
| 142 | |
| 143 | def _get_connection_types(config): |
| 144 | """Get all the connection types for a given configuration json |
| 145 | |
| 146 | Crawls through the entire config dict recursively and returns the entries |
| 147 | that have been identified as service connections in the form of a list of tuples - |
| 148 | |
| 149 | [(config key, component type), ..] |
| 150 | |
| 151 | where "config key" is a compound key in the form of a tuple. Each entry in |
| 152 | the compound key is a key to a level within the json data structure.""" |
| 153 | def grab_component_type(v): |
| 154 | # To support Python2, unicode strings are not type `str`. Specifically, |
| 155 | # the config string values from Consul maybe encoded to utf-8 so better |
| 156 | # be prepared. |
| 157 | if isinstance(v, six.string_types): |
| 158 | # Regex matches on strings like "{{foo}}" and "{{ BAR }}" and |
| 159 | # extracts the alphanumeric string inside the parantheses. |
| 160 | result = re.match("^{{\s*([-_.\w]*)\s*}}", v) |
| 161 | return result.group(1) if result else None |
| 162 | |
| 163 | def crawl(config, parent_key=()): |
| 164 | if isinstance(config, dict): |
| 165 | rels = [ crawl(value, parent_key + (key, )) |
| 166 | for key, value in config.items() ] |
| 167 | rels = chain(*rels) |
| 168 | elif isinstance(config, list): |
| 169 | rels = [ crawl(config[index], parent_key + (index, )) |
| 170 | for index in range(0, len(config)) ] |
| 171 | rels = chain(*rels) |
| 172 | else: |
| 173 | rels = [(parent_key, grab_component_type(config))] |
| 174 | |
| 175 | # Filter out the entries with Nones |
| 176 | rels = [(key, rel) for key, rel in rels if rel] |
| 177 | return rels |
| 178 | |
| 179 | return crawl(config) |
| 180 | |
| 181 | def _has_connections(config): |
| 182 | return True if _get_connection_types(config) else False |
| 183 | |
| 184 | def _resolve_connection_types(service_name, connection_types, relationships): |
| 185 | |
| 186 | def find_match(connection_type): |
| 187 | ret_list = [] |
| 188 | for rel in relationships: |
| 189 | if connection_type in rel: |
| 190 | ret_list.append(rel) |
| 191 | return ret_list |
| 192 | |
| 193 | return [ (key, find_match(connection_type)) |
| 194 | for key, connection_type in connection_types ] |
| 195 | |
| 196 | def _resolve_name(lookup_func, service_name): |
| 197 | """Resolves the service component name to detailed connection information |
| 198 | |
| 199 | Currently this is grouped into two ways: |
| 200 | 1. CDAP applications take a two step approach - call Consul then call the |
| 201 | CDAP broker |
| 202 | 2. All other applications just call Consul to get IP and port |
| 203 | |
| 204 | Args: |
| 205 | ---- |
| 206 | lookup_func: fn(string) -> list of dicts |
| 207 | The function should return a list of dicts that have "ServiceAddress" and |
| 208 | "ServicePort" key value entries |
| 209 | service_name: (string) service name to lookup |
| 210 | |
| 211 | Return depends upon the connection type: |
| 212 | 1. CDAP applications return a dict |
| 213 | 2. All other applications return a string |
| 214 | """ |
| 215 | def handle_result(result): |
| 216 | ip = result["ServiceAddress"] |
| 217 | port = result["ServicePort"] |
| 218 | |
| 219 | if not (ip and port): |
| 220 | raise DiscoveryResolvingNameError( |
| 221 | "Failed to resolve name for {0}: ip, port not set".format(service_name)) |
| 222 | |
| 223 | # TODO: Need a better way to identify CDAP apps. Really need to make this |
| 224 | # better. |
Michael Hwang | e11be5d | 2017-09-21 12:19:32 -0400 | [diff] [blame] | 225 | if "cdap" in service_name: |
Tommy Carpenter | 81b9ed7 | 2017-08-23 11:21:44 -0400 | [diff] [blame] | 226 | redirectish_url = "http://{0}:{1}/application/{2}".format(ip, port, |
| 227 | service_name) |
| 228 | |
| 229 | r = requests.get(redirectish_url) |
| 230 | r.raise_for_status() |
| 231 | details = r.json() |
| 232 | # Pick out the details to expose to the component developers |
| 233 | return { key: details[key] |
| 234 | for key in ["connectionurl", "serviceendpoints"] } |
| 235 | else: |
| 236 | return "{0}:{1}".format(ip, port) |
| 237 | |
| 238 | try: |
| 239 | results = lookup_func(service_name) |
| 240 | return [ handle_result(result) for result in results ] |
| 241 | except Exception as e: |
| 242 | raise DiscoveryResolvingNameError( |
| 243 | "Failed to resolve name for {0}: {1}".format(service_name, e)) |
| 244 | |
| 245 | def _resolve_configuration_dict(ch, service_name, config): |
| 246 | """ |
| 247 | Helper used by both resolve_configuration_dict and get_configuration |
| 248 | """ |
| 249 | if _has_connections(config): |
| 250 | rels = _get_relationships_from_consul(ch, service_name) |
| 251 | connection_types = _get_connection_types(config) |
| 252 | connection_names = _resolve_connection_types(service_name, connection_types, rels) |
| 253 | # NOTE: The hardcoded use of the first element. This is to keep things backwards |
| 254 | # compatible since resolve name now returns a list. |
| 255 | for key, conn in [(key, [_resolve_name(partial(_lookup_with_consul, ch), name)[0] for name in names]) for key, names in connection_names]: |
| 256 | config = util.update_json(config, key, conn) |
| 257 | |
| 258 | _logger.info("Generated config: {0}".format(config)) |
| 259 | return config |
| 260 | |
| 261 | ##### |
| 262 | # Public calls |
| 263 | ##### |
| 264 | |
| 265 | def get_consul_hostname(consul_hostname_override=None): |
| 266 | """Get the Consul hostname""" |
| 267 | try: |
| 268 | return consul_hostname_override \ |
| 269 | if consul_hostname_override else os.environ["CONSUL_HOST"] |
| 270 | except: |
| 271 | raise DiscoveryInitError("CONSUL_HOST variable has not been set!") |
| 272 | |
| 273 | def get_service_name(): |
| 274 | """Get the full service name |
| 275 | |
| 276 | This is expected to be given from whatever entity is starting this service |
| 277 | and given by an environment variable called "HOSTNAME".""" |
| 278 | try: |
| 279 | return os.environ["HOSTNAME"] |
| 280 | except: |
| 281 | raise DiscoveryInitError("HOSTNAME variable has not been set!") |
| 282 | |
| 283 | |
| 284 | def resolve_name(consul_host, service_name, max_attempts=3): |
| 285 | """Resolve the service name |
| 286 | |
| 287 | Do a service discovery lookup from Consul and return back the detailed connection |
| 288 | information. |
| 289 | |
| 290 | Returns: |
| 291 | -------- |
| 292 | For CDAP apps, returns a dict. All others a string with the format "<ip>:<port>" |
| 293 | """ |
| 294 | ch = consul.Consul(host=consul_host) |
| 295 | lookup_func = partial(_lookup_with_consul, ch, max_attempts=max_attempts) |
| 296 | return _resolve_name(lookup_func, service_name) |
| 297 | |
| 298 | |
| 299 | def resolve_configuration_dict(consul_host, service_name, config): |
| 300 | """ |
| 301 | Utility method for taking a given service_name, and config dict, and resolving it |
| 302 | """ |
| 303 | ch = consul.Consul(host=consul_host) |
| 304 | return _resolve_configuration_dict(ch, service_name, config) |
| 305 | |
| 306 | |
| 307 | def get_configuration(override_consul_hostname=None, override_service_name=None, |
| 308 | from_cbs=True): |
| 309 | """Provides this service component's configuration information fully resolved |
| 310 | |
| 311 | This method can either resolve the configuration locally here or make a |
| 312 | remote call to the config binding service. The default is to use the config |
| 313 | binding service. |
| 314 | |
| 315 | Args: |
| 316 | ----- |
| 317 | override_consul_hostname (string): Consul hostname to use rather than the one |
| 318 | set by the environment variable CONSUL_HOST |
| 319 | override_service_name (string): Use this name over the name set on the |
| 320 | HOSTNAME environment variable. Default is None. |
| 321 | from_cbs (boolean): True (default) means use the config binding service otherwise |
| 322 | set to False to have the config pulled and resolved by this library |
| 323 | |
| 324 | Returns the fully resolved service component configuration as a dict |
| 325 | """ |
| 326 | # Get config, bootstrap |
| 327 | consul_hostname = get_consul_hostname(override_consul_hostname) |
| 328 | # NOTE: We use the default port 8500 |
| 329 | ch = consul.Consul(host=consul_hostname) |
| 330 | service_name = override_service_name if override_service_name else get_service_name() |
| 331 | _logger.info("service name: {0}".format(service_name)) |
| 332 | |
| 333 | if from_cbs: |
| 334 | return _get_configuration_resolved_from_cbs(ch, service_name) |
| 335 | else: |
| 336 | # The following will happen: |
| 337 | # |
| 338 | # 1. Fetching the configuration by service component name from Consul |
| 339 | # 2. Fetching the relationships for this service component by service component |
| 340 | # name |
| 341 | # 3. Pick out the connection types from the templetized fields in the configuration |
| 342 | # 4. Resolve the connection types with connection names using the step #2 |
| 343 | # information |
| 344 | # 5. Resolve the connection names with the actual connection via queries to |
| 345 | # Consul using the connection name |
| 346 | config = _get_configuration_from_consul(ch, service_name) |
| 347 | return _resolve_configuration_dict(ch, service_name, config) |
| 348 | |
| 349 | |
| 350 | def register_for_discovery(consul_host, service_ip, service_port): |
| 351 | """Register the service component for service discovery |
| 352 | |
| 353 | This is required in order for other services to "discover" you so that you |
| 354 | can service their requests. |
| 355 | |
| 356 | NOTE: Applications may not need to make this call depending upon if the |
| 357 | environment is using Registrator. |
| 358 | """ |
| 359 | ch = consul.Consul(host=consul_host) |
| 360 | service_name = get_service_name() |
| 361 | |
| 362 | if _register_with_consul(ch, service_name, service_ip, service_port, "health"): |
| 363 | _logger.info("Registered to consul: {0}".format(service_name)) |
| 364 | else: |
| 365 | _logger.error("Failed to register to consul: {0}".format(service_name)) |
| 366 | raise DiscoveryRegistrationError() |