additional mr client

additional mr client, fewer dependencies

Change-Id: I36168fd6e82846a889cd9a01aadf2462bb767723
Issue-ID: CCSDK-327
Signed-off-by: Smokowski, Kevin (ks6305) <ks6305@att.com>
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java
new file mode 100644
index 0000000..d720e5f
--- /dev/null
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java
@@ -0,0 +1,211 @@
+/*-

+ * ============LICENSE_START=======================================================

+ * openECOMP : SDN-C

+ * ================================================================================

+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights

+ * 						reserved.

+ * ================================================================================

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ * 

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ * ============LICENSE_END=========================================================

+ */

+

+package org.onap.ccsdk.sli.northbound.dmaapclient;

+

+import java.io.BufferedReader;

+import java.io.File;

+import java.io.FileInputStream;

+import java.io.FileNotFoundException;

+import java.io.IOException;

+import java.io.InputStreamReader;

+import java.io.UnsupportedEncodingException;

+import java.net.MalformedURLException;

+import java.net.URL;

+import java.net.URLEncoder;

+import java.nio.charset.StandardCharsets;

+import java.util.Base64;

+import java.util.Properties;

+import org.slf4j.Logger;

+import org.slf4j.LoggerFactory;

+import java.net.HttpURLConnection;

+

+/*

+ * java.net based client to build message router consumers

+ */

+public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {

+    private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);

+

+    protected Boolean isReady = false;

+    protected Boolean isRunning = false;

+    protected URL url;

+    protected Integer fetchPause;

+    protected Properties properties;

+    protected final String DEFAULT_CONNECT_TIMEOUT = "30000";

+    protected final String DEFAULT_READ_TIMEOUT = "180000";

+    protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";

+    protected final String DEFAULT_LIMIT = null;

+    private String authorizationString;

+    protected Integer connectTimeout;

+    protected Integer readTimeout;

+    protected String topic;

+

+    public MessageRouterHttpClientJdk() {}

+

+    @Override

+    public void run() {

+        if (isReady) {

+            isRunning = true;

+            while (isRunning) {

+                HttpURLConnection httpUrlConnection = null;

+                try {

+                    httpUrlConnection = (HttpURLConnection) url.openConnection();

+                    if (authorizationString != null) {

+                        httpUrlConnection.addRequestProperty("Authorization", authorizationString);

+                    }

+                    httpUrlConnection.setRequestMethod("GET");

+                    httpUrlConnection.setRequestProperty("Accept", "application/json");

+                    httpUrlConnection.setUseCaches(false);

+                    httpUrlConnection.setConnectTimeout(connectTimeout);

+                    httpUrlConnection.setReadTimeout(readTimeout);

+                    httpUrlConnection.connect();

+                    int status = httpUrlConnection.getResponseCode();

+                    Log.info("GET " + url + " returned http status " + status);

+                    if (status < 300) {

+                        BufferedReader br =

+                                new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));

+                        StringBuilder sb = new StringBuilder();

+                        String line;

+                        while ((line = br.readLine()) != null) {

+                            sb.append(line + "\n");

+                        }

+                        br.close();

+                        String responseBody = sb.toString();

+                        if (responseBody.contains("{")) {

+                            // Get rid of opening [" entity =

+                            responseBody = responseBody.substring(2);

+                            // Get rid of closing "]

+                            responseBody = responseBody.substring(0, responseBody.length() - 2);

+                            // Split the json array into individual elements to process

+                            for (String message : responseBody.split("\",\"")) {

+                                // unescape the json

+                                message = message.replace("\\\"", "\"");

+                                // Topic names cannot contain periods

+                                processMsg(message);

+                            }

+                        } else {

+                            Log.info("Entity doesn't appear to contain JSON elements, logging body");

+                            Log.info(responseBody);

+                        }

+                    }

+                } catch (Exception e) {

+                    Log.error("GET " + url + " failed.", e);

+                } finally {

+                    if (httpUrlConnection != null) {

+                        httpUrlConnection.disconnect();

+                    }

+                    Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");

+                    try {

+                        Thread.sleep(fetchPause);

+                    } catch (InterruptedException e) {

+                        Log.error("Could not sleep thread", e);

+                    }

+                }

+            }

+        }

+    }

+

+    @Override

+    public void init(Properties baseProperties, String consumerPropertiesPath) {

+        try {

+            baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));

+

+            this.properties = baseProperties;

+            String username = properties.getProperty("username");

+            String password = properties.getProperty("password");

+            topic = properties.getProperty("topic");

+            String group = properties.getProperty("group");

+            String host = properties.getProperty("host");

+            String id = properties.getProperty("id");

+

+            String filter = properties.getProperty("filter");

+            if (filter != null) {

+                if (filter.length() > 0) {

+                    try {

+                        filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());

+                    } catch (UnsupportedEncodingException e) {

+                        Log.error("Couldn't encode filter string", e);

+                    }

+                } else {

+                    filter = null;

+                }

+            }

+

+            String limitString = properties.getProperty("limit", DEFAULT_LIMIT);

+            Integer limit = null;

+            if (limitString != null && limitString.length() > 0) {

+                limit = Integer.valueOf(limitString);

+            }

+

+            Integer timeoutQueryParamValue =

+                    Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));

+            connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));

+            readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));

+            if (username != null && password != null && username.length() > 0 && password.length() > 0) {

+                authorizationString = buildAuthorizationString(username, password);

+            }

+            String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);

+            this.url = new URL(urlString);

+            this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));

+            this.isReady = true;

+        } catch (FileNotFoundException e) {

+            Log.error("FileNotFoundException while reading consumer properties", e);

+        } catch (IOException e) {

+            Log.error("IOException while reading consumer properties", e);

+        }

+    }

+

+    public void processMsg(String msg) {

+        Log.info(msg);

+    }

+

+    protected String buildAuthorizationString(String userName, String password) {

+        String basicAuthString = userName + ":" + password;

+        basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());

+        return "Basic " + basicAuthString;

+    }

+

+    protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,

+            Integer timeout, Integer limit, String filter) {

+        StringBuilder sb = new StringBuilder();

+        sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);

+        sb.append("?timeout=" + timeout);

+

+        if (limit != null) {

+            sb.append("&limit=" + limit);

+        }

+        if (filter != null) {

+            sb.append("&filter=" + filter);

+        }

+        return sb.toString();

+    }

+

+    @Override

+    public boolean isReady() {

+        return isReady;

+    }

+

+    @Override

+    public boolean isRunning() {

+        return isRunning;

+    }

+

+}