blob: 51b58976281105d6ae583fdbf149db5aa7893859 [file] [log] [blame]
Tommy Carpenter81b9ed72017-08-23 11:21:44 -04001# org.onap.dcae
2# ================================================================================
3# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
4# ================================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=========================================================
17#
18# ECOMP is a trademark and service mark of AT&T Intellectual Property.
19
20import time, json, os, re, logging
21from itertools import chain
22from functools import partial
23import requests
24import consul
25import six
26from discovery_client import util
27
28
29_logger = util.get_logger(__name__)
30
31class DiscoveryInitError(RuntimeError):
32 pass
33
34class DiscoveryRegistrationError(RuntimeError):
35 pass
36
37class DiscoveryResolvingNameError(RuntimeError):
38 pass
39
40
41#####
42# Consul calls for services
43#####
44
45def _get_configuration_from_consul(consul_handle, service_name):
46 index = None
47 while True:
48 index, data = consul_handle.kv.get(service_name, index=index)
49
50 if data:
51 return json.loads(data["Value"].decode("utf-8"))
52 else:
53 _logger.warn("No configuration found for {0}. Try again in a bit."
54 .format(service_name))
55 time.sleep(5)
56
57def _get_relationships_from_consul(consul_handle, service_name):
58 """Fetch the relationship information from Consul for a service by service
59 name. Returns a list of service names."""
60 index = None
61 rel_key = "{0}:rel".format(service_name)
62 while True:
63 index, data = consul_handle.kv.get(rel_key, index=index)
64
65 if data:
66 return json.loads(data["Value"].decode("utf-8"))
67 else:
68 _logger.warn("No relationships found for {0}. Try again in a bit."
69 .format(service_name))
70 time.sleep(5)
71
72def _lookup_with_consul(consul_handle, service_name, max_attempts=0):
73 num_attempts = 1
74
75 while True:
76 index, results = consul_handle.catalog.service(service_name)
77
78 if results:
79 return results
80 else:
81 num_attempts += 1
82
83 if max_attempts > 0 and max_attempts < num_attempts:
84 return None
85
86 _logger.warn("Service not found {0}. Trying again in a bit."
87 .format(service_name))
88 time.sleep(5)
89
90def _register_with_consul(consul_handle, service_name, service_ip, service_port,
91 health_endpoint):
92 # https://www.consul.io/docs/agent/http/agent.html#agent_service_register
93 # Note: Unhealthy services should not return in queries i.e.
94 # dig @127.0.0.1 -p 8600 foo.service.consul
95 health_url = "http://{0}:{1}/{2}".format(service_ip, service_port, health_endpoint)
96 return consul_handle.agent.service.register(service_name, address=service_ip,
97 port=service_port, check= { "HTTP": health_url, "Interval": "5s" })
98
99#####
100# Config binding service call
101#####
102
103def _get_configuration_resolved_from_cbs(consul_handle, service_name):
104 """
105 This is what a minimal python client library that wraps the CBS would look like.
106 POSSIBLE TODO: break this out into pypi repo
107
108 This call does not raise an exception if Consul or the CBS cannot complete the request.
109 It logs an error and returns {} if the config is not bindable.
110 It could be a temporary network outage. Call me again later.
111
112 It will raise an exception if the necessary env parameters were not set because that is irrecoverable.
113 This function is called in my /heatlhcheck, so this will be caught early.
114 """
115 config = {}
116
117 results = _lookup_with_consul(consul_handle, "config_binding_service",
118 max_attempts=5)
119
120 if results is None:
121 logger.error("Cannot bind config at this time, cbs is unreachable")
122 else:
123 cbs_hostname = results[0]["ServiceAddress"]
124 cbs_port = results[0]["ServicePort"]
125 cbs_url = "http://{hostname}:{port}".format(hostname=cbs_hostname, port=cbs_port)
126
127 #get my config
128 my_config_endpoint = "{0}/service_component/{1}".format(cbs_url,
129 service_name)
130 res = requests.get(my_config_endpoint)
131 try:
132 res.raise_for_status()
133 config = res.json()
134 _logger.info("get_config returned the following configuration: {0}".format(json.dumps(config)))
135 except:
136 _logger.error("in get_config, the config binding service endpoint {0} blew up on me. Error code: {1}, Error text: {2}".format(my_config_endpoint, res.status_code, res.text))
137 return config
138
139#####
140# Functionality for putting together service's configuration
141#####
142
143def _get_connection_types(config):
144 """Get all the connection types for a given configuration json
145
146 Crawls through the entire config dict recursively and returns the entries
147 that have been identified as service connections in the form of a list of tuples -
148
149 [(config key, component type), ..]
150
151 where "config key" is a compound key in the form of a tuple. Each entry in
152 the compound key is a key to a level within the json data structure."""
153 def grab_component_type(v):
154 # To support Python2, unicode strings are not type `str`. Specifically,
155 # the config string values from Consul maybe encoded to utf-8 so better
156 # be prepared.
157 if isinstance(v, six.string_types):
158 # Regex matches on strings like "{{foo}}" and "{{ BAR }}" and
159 # extracts the alphanumeric string inside the parantheses.
160 result = re.match("^{{\s*([-_.\w]*)\s*}}", v)
161 return result.group(1) if result else None
162
163 def crawl(config, parent_key=()):
164 if isinstance(config, dict):
165 rels = [ crawl(value, parent_key + (key, ))
166 for key, value in config.items() ]
167 rels = chain(*rels)
168 elif isinstance(config, list):
169 rels = [ crawl(config[index], parent_key + (index, ))
170 for index in range(0, len(config)) ]
171 rels = chain(*rels)
172 else:
173 rels = [(parent_key, grab_component_type(config))]
174
175 # Filter out the entries with Nones
176 rels = [(key, rel) for key, rel in rels if rel]
177 return rels
178
179 return crawl(config)
180
181def _has_connections(config):
182 return True if _get_connection_types(config) else False
183
184def _resolve_connection_types(service_name, connection_types, relationships):
185
186 def find_match(connection_type):
187 ret_list = []
188 for rel in relationships:
189 if connection_type in rel:
190 ret_list.append(rel)
191 return ret_list
192
193 return [ (key, find_match(connection_type))
194 for key, connection_type in connection_types ]
195
196def _resolve_name(lookup_func, service_name):
197 """Resolves the service component name to detailed connection information
198
199 Currently this is grouped into two ways:
200 1. CDAP applications take a two step approach - call Consul then call the
201 CDAP broker
202 2. All other applications just call Consul to get IP and port
203
204 Args:
205 ----
206 lookup_func: fn(string) -> list of dicts
207 The function should return a list of dicts that have "ServiceAddress" and
208 "ServicePort" key value entries
209 service_name: (string) service name to lookup
210
211 Return depends upon the connection type:
212 1. CDAP applications return a dict
213 2. All other applications return a string
214 """
215 def handle_result(result):
216 ip = result["ServiceAddress"]
217 port = result["ServicePort"]
218
219 if not (ip and port):
220 raise DiscoveryResolvingNameError(
221 "Failed to resolve name for {0}: ip, port not set".format(service_name))
222
223 # TODO: Need a better way to identify CDAP apps. Really need to make this
224 # better.
Michael Hwange11be5d2017-09-21 12:19:32 -0400225 if "cdap" in service_name:
Tommy Carpenter81b9ed72017-08-23 11:21:44 -0400226 redirectish_url = "http://{0}:{1}/application/{2}".format(ip, port,
227 service_name)
228
229 r = requests.get(redirectish_url)
230 r.raise_for_status()
231 details = r.json()
232 # Pick out the details to expose to the component developers
233 return { key: details[key]
234 for key in ["connectionurl", "serviceendpoints"] }
235 else:
236 return "{0}:{1}".format(ip, port)
237
238 try:
239 results = lookup_func(service_name)
240 return [ handle_result(result) for result in results ]
241 except Exception as e:
242 raise DiscoveryResolvingNameError(
243 "Failed to resolve name for {0}: {1}".format(service_name, e))
244
245def _resolve_configuration_dict(ch, service_name, config):
246 """
247 Helper used by both resolve_configuration_dict and get_configuration
248 """
249 if _has_connections(config):
250 rels = _get_relationships_from_consul(ch, service_name)
251 connection_types = _get_connection_types(config)
252 connection_names = _resolve_connection_types(service_name, connection_types, rels)
253 # NOTE: The hardcoded use of the first element. This is to keep things backwards
254 # compatible since resolve name now returns a list.
255 for key, conn in [(key, [_resolve_name(partial(_lookup_with_consul, ch), name)[0] for name in names]) for key, names in connection_names]:
256 config = util.update_json(config, key, conn)
257
258 _logger.info("Generated config: {0}".format(config))
259 return config
260
261#####
262# Public calls
263#####
264
265def get_consul_hostname(consul_hostname_override=None):
266 """Get the Consul hostname"""
267 try:
268 return consul_hostname_override \
269 if consul_hostname_override else os.environ["CONSUL_HOST"]
270 except:
271 raise DiscoveryInitError("CONSUL_HOST variable has not been set!")
272
273def get_service_name():
274 """Get the full service name
275
276 This is expected to be given from whatever entity is starting this service
277 and given by an environment variable called "HOSTNAME"."""
278 try:
279 return os.environ["HOSTNAME"]
280 except:
281 raise DiscoveryInitError("HOSTNAME variable has not been set!")
282
283
284def resolve_name(consul_host, service_name, max_attempts=3):
285 """Resolve the service name
286
287 Do a service discovery lookup from Consul and return back the detailed connection
288 information.
289
290 Returns:
291 --------
292 For CDAP apps, returns a dict. All others a string with the format "<ip>:<port>"
293 """
294 ch = consul.Consul(host=consul_host)
295 lookup_func = partial(_lookup_with_consul, ch, max_attempts=max_attempts)
296 return _resolve_name(lookup_func, service_name)
297
298
299def resolve_configuration_dict(consul_host, service_name, config):
300 """
301 Utility method for taking a given service_name, and config dict, and resolving it
302 """
303 ch = consul.Consul(host=consul_host)
304 return _resolve_configuration_dict(ch, service_name, config)
305
306
307def get_configuration(override_consul_hostname=None, override_service_name=None,
308 from_cbs=True):
309 """Provides this service component's configuration information fully resolved
310
311 This method can either resolve the configuration locally here or make a
312 remote call to the config binding service. The default is to use the config
313 binding service.
314
315 Args:
316 -----
317 override_consul_hostname (string): Consul hostname to use rather than the one
318 set by the environment variable CONSUL_HOST
319 override_service_name (string): Use this name over the name set on the
320 HOSTNAME environment variable. Default is None.
321 from_cbs (boolean): True (default) means use the config binding service otherwise
322 set to False to have the config pulled and resolved by this library
323
324 Returns the fully resolved service component configuration as a dict
325 """
326 # Get config, bootstrap
327 consul_hostname = get_consul_hostname(override_consul_hostname)
328 # NOTE: We use the default port 8500
329 ch = consul.Consul(host=consul_hostname)
330 service_name = override_service_name if override_service_name else get_service_name()
331 _logger.info("service name: {0}".format(service_name))
332
333 if from_cbs:
334 return _get_configuration_resolved_from_cbs(ch, service_name)
335 else:
336 # The following will happen:
337 #
338 # 1. Fetching the configuration by service component name from Consul
339 # 2. Fetching the relationships for this service component by service component
340 # name
341 # 3. Pick out the connection types from the templetized fields in the configuration
342 # 4. Resolve the connection types with connection names using the step #2
343 # information
344 # 5. Resolve the connection names with the actual connection via queries to
345 # Consul using the connection name
346 config = _get_configuration_from_consul(ch, service_name)
347 return _resolve_configuration_dict(ch, service_name, config)
348
349
350def register_for_discovery(consul_host, service_ip, service_port):
351 """Register the service component for service discovery
352
353 This is required in order for other services to "discover" you so that you
354 can service their requests.
355
356 NOTE: Applications may not need to make this call depending upon if the
357 environment is using Registrator.
358 """
359 ch = consul.Consul(host=consul_host)
360 service_name = get_service_name()
361
362 if _register_with_consul(ch, service_name, service_ip, service_port, "health"):
363 _logger.info("Registered to consul: {0}".format(service_name))
364 else:
365 _logger.error("Failed to register to consul: {0}".format(service_name))
366 raise DiscoveryRegistrationError()