Incorporate Liam code review

Change-Id: I387a54b8bd8c619c521c512258928669a8c1b791
Issue-ID: POLICY-756
Signed-off-by: pa834y <pa834y@att.com>
diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
index 9f9dc37..00c8c5f 100644
--- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
+++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
@@ -21,7 +21,9 @@
 package org.onap.policy.std;
 
 import java.net.URI;
-import javax.websocket.ClientEndpoint;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.java_websocket.client.WebSocketClient;
 import org.java_websocket.handshake.ServerHandshake;
 import org.onap.policy.api.NotificationHandler;
@@ -32,7 +34,6 @@
 import org.onap.policy.common.logging.flexlogger.Logger;
 import org.onap.policy.xacml.api.XACMLErrorConstants;
 
-@ClientEndpoint
 public class AutoClientEnd extends WebSocketClient {
     private static StdPDPNotification notification = null;
     private static StdPDPNotification oldNotification = null;
@@ -42,8 +43,9 @@
     private static String url = null;
     private static boolean status = false;
     private static boolean stop = false;
-    private static boolean message = false;
     private static boolean error = false;
+    private static boolean restartNeeded = false;
+    private static ScheduledExecutorService restartExecutorService = null;
     private static Logger logger = FlexLogger.getLogger(AutoClientEnd.class.getName());
 
     private AutoClientEnd(URI serverUri) {
@@ -53,7 +55,6 @@
     @Override
     public void onMessage(String msg) {
         logger.info("Received Auto Notification from : " + getURI() + ", Notification: " + msg);
-        AutoClientEnd.message = true;
         try {
             AutoClientEnd.notification = NotificationUnMarshal.notificationJSON(msg);
         } catch (Exception e) {
@@ -68,45 +69,38 @@
             AutoClientEnd.oldNotification = AutoClientEnd.notification;
             callHandler();
         }
-
-        AutoClientEnd.message = false;
     }
 
     @Override
     public void onClose(int code, String reason, boolean remote) {
         logger.info("AutoClientEnd disconnected from: " + getURI() + "; Code: " + code + ", reason :  " + reason);
-        if (!AutoClientEnd.stop && !AutoClientEnd.message) {
-            // This Block of code is executed if there is any Network Failure or
-            // if the Notification is Down.
-            logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + "Disconnected from Notification Server");
-            AutoClientEnd.client = null;
-            AutoClientEnd.status = false;
-            // Try to connect Back to available PDP.
-            AutoClientEnd.error = true;
-            start(url);
-        }
-        AutoClientEnd.message = false;
+        AutoClientEnd.restartNeeded = true;
     }
 
     @Override
     public void onError(Exception ex) {
         logger.error("XACMLErrorConstants.ERROR_PROCESS_FLOW + Error connecting to: " + getURI()
                 + ", Exception occured ...\n" + ex);
-        // trying to Restart by self.
-        stop();
-        if (AutoClientEnd.url != null) {
-            AutoClientEnd.client = null;
-            AutoClientEnd.status = false;
-            AutoClientEnd.error = true;
-            AutoClientEnd.start(AutoClientEnd.url);
-        }
+        AutoClientEnd.restartNeeded = true;
     }
 
     @Override
     public void onOpen(ServerHandshake arg0) {
+        restartNeeded = false;
         logger.info("Auto Notification Session Started... " + getURI());
     }
 
+    private static void restart() {
+        try {
+            if (client != null && restartNeeded && !stop) {
+                logger.info("Auto Notification Session Restarting ... " + getUrl());
+                client.reconnect();
+            }
+        } catch (Exception e) {
+            logger.info("Auto Notification Session Error Started... " + getUrl());
+        }
+    }
+
     /**
      * Sets the auto.
      *
@@ -156,6 +150,10 @@
             client = new AutoClientEnd(new URI(url + "notifications"));
             client.connect();
             status = true;
+            restartExecutorService = Executors.newSingleThreadScheduledExecutor();
+            Runnable task = AutoClientEnd::restart;
+            restartExecutorService.scheduleAtFixedRate(task, 60, 60, TimeUnit.SECONDS);
+
             if (error) {
                 // will not trigger. leave it in to later add checks
                 // The URL's will be in Sync according to design Spec.
@@ -171,7 +169,6 @@
             }
         } catch (Exception e) {
             logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e);
-            client = null;
             status = false;
             changeUrl();
         }
@@ -179,6 +176,7 @@
 
     private static void changeUrl() {
         // Change the PDP if it is not Up.
+        stop();
         StdPolicyEngine.rotatePDPList();
         start(StdPolicyEngine.getPDPURL());
     }
@@ -192,15 +190,25 @@
         }
         logger.info("\n Closing Auto Notification WebSocket Connection.. ");
         stop = true;
+        // first stop the restart service
+        try {
+            restartExecutorService.shutdown();
+        } catch (Exception e1) {
+            logger.info("\n AutoClientEnd: Error stoppping the restart Scheduler ");
+        }
+
+        // close the connection
         try {
             client.closeBlocking();
-        } catch (InterruptedException e) {
-            logger.info("\n Error Closing Auto Notification WebSocket Connection.. InterruptedException");
+        } catch (Exception e) {
+            logger.error("\n ERROR Closing Auto Notification WebSocket Connection.. ");
         }
+
         logger.info("\n Closed the Auto Notification WebSocket Connection.. ");
         client = null;
         status = false;
         stop = false;
+        restartNeeded = false;
     }
 
     private static void callHandler() {
diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
index 2fe6dc0..a67b540 100644
--- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
+++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
@@ -22,7 +22,6 @@
 
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
-import javax.websocket.ClientEndpoint;
 import org.java_websocket.client.WebSocketClient;
 import org.java_websocket.handshake.ServerHandshake;
 import org.onap.policy.api.NotificationScheme;
@@ -33,7 +32,6 @@
 import org.onap.policy.std.StdPDPNotification;
 import org.onap.policy.xacml.api.XACMLErrorConstants;
 
-@ClientEndpoint
 public class ManualClientEnd extends WebSocketClient {
     private static CountDownLatch latch;
     private static StdPDPNotification notification = null;
@@ -63,12 +61,11 @@
         logger.info("Manual Notification Recieved Message from : " + getURI() + ", Notification: " + message);
         ManualClientEnd.resultJson = message;
         try {
-            ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message);
-            latch.countDown();
+            ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message);     
         } catch (Exception e) {
             logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
-            latch.countDown();
         }
+        latch.countDown();
     }
 
     @Override
@@ -95,7 +92,7 @@
             client = new ManualClientEnd(new URI(url + "notifications"));
             client.connect();
             latch.await();
-            client.close();
+            client.closeBlocking();
         } catch (Exception e) {
             logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e);
         }
diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
index 4f1ce6f..5056fce 100644
--- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
+++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
@@ -21,12 +21,11 @@
 package org.onap.policy.std.test;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.java_websocket.WebSocket;
 import org.java_websocket.handshake.ClientHandshake;
 import org.java_websocket.server.WebSocketServer;
@@ -37,7 +36,6 @@
 import org.onap.policy.api.NotificationScheme;
 import org.onap.policy.api.PDPNotification;
 import org.onap.policy.std.AutoClientEnd;
-import org.onap.policy.std.StdPDPNotification;
 import org.springframework.util.SocketUtils;
 
 /**
@@ -47,9 +45,9 @@
 public class AutoClientEndTest {
     private static WebSocketServer ws;
 
-    private static int port = 18080;
+    private static int port = SocketUtils.findAvailableTcpPort();
     private static CountDownLatch countServerDownLatch = null;
-    private StdPDPNotification notification = null;
+    private static PDPNotification notification = null;
 
     /**
      * Start server.
@@ -58,8 +56,8 @@
      */
     @BeforeClass
     public static void startServer() throws Exception {
-        port = SocketUtils.findAvailableTcpPort();
-        ws = new WebSocketServer(new InetSocketAddress(port), 16) {
+        notification = null;
+        ws = new WebSocketServer(new InetSocketAddress(port), 1) {
             @Override
             public void onOpen(WebSocket conn, ClientHandshake handshake) {
                 conn.send("{\"removedPolicies\": [],\"loadedPolicies\": "
@@ -91,7 +89,7 @@
 
         };
 
-        ws.setConnectionLostTimeout(30);
+        ws.setConnectionLostTimeout(0);
         ws.start();
     }
 
@@ -102,8 +100,8 @@
         NotificationHandler handler = new NotificationHandler() {
 
             @Override
-            public void notificationReceived(PDPNotification notifi) {
-                notification = (StdPDPNotification) notifi;
+            public void notificationReceived(PDPNotification notify) {
+                notification = notify;
                 countServerDownLatch.countDown();
 
             }
@@ -113,17 +111,26 @@
         countServerDownLatch = new CountDownLatch(1);
 
         AutoClientEnd.start("http://localhost:" + port + "/");
-        countServerDownLatch.await();
+        countServerDownLatch.await(45, TimeUnit.SECONDS);
 
 
         assertNotNull(notification);
-        assertTrue(AutoClientEnd.getStatus());
+
+
+        // simulate a server restart and verify client reconnects
+        countServerDownLatch = new CountDownLatch(1);
+        ws.stop(30000);
+        startServer();
+        countServerDownLatch.await(60+10, TimeUnit.SECONDS);
+        assertNotNull(notification);
+
+        AutoClientEnd.stop();
+
     }
 
     @AfterClass
-    public static void successTests() throws InterruptedException, IOException {
-        AutoClientEnd.stop();
-        ws.stop();
+    public static void stopServer() throws InterruptedException, IOException {
+        ws.stop(30000);
     }
 
 
diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
index 4a09164..252fa7e 100644
--- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
+++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
@@ -27,6 +27,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.java_websocket.WebSocket;
 import org.java_websocket.handshake.ClientHandshake;
 import org.java_websocket.server.WebSocketServer;
@@ -40,13 +41,11 @@
 /**
  * The class <code>ManualClientEndTest</code> contains tests for the class <code>{@link ManualClientEnd}</code>.
  *
- * @generatedBy CodePro at 6/1/16 1:41 PM
- * @version $Revision: 1.0 $
  */
 public class ManualClientEndTest {
     private static WebSocketServer ws;
 
-    private static int port = 18080;
+    private static int port = SocketUtils.findAvailableTcpPort();
     private static CountDownLatch countServerDownLatch = null;
     private static String recvMsg = null;
 
@@ -57,8 +56,7 @@
      */
     @BeforeClass
     public static void startServer() throws Exception {
-        port = SocketUtils.findAvailableTcpPort();
-        ws = new WebSocketServer(new InetSocketAddress(port), 16) {
+        ws = new WebSocketServer(new InetSocketAddress(port), 1) {
             @Override
             public void onOpen(WebSocket conn, ClientHandshake handshake) {
 
@@ -93,16 +91,16 @@
 
         };
 
-        ws.setConnectionLostTimeout(30);
+        ws.setConnectionLostTimeout(0);
         ws.start();
     }
 
     @Test
-    public void testAutoClient() throws Exception {
+    public void testManualClient() throws Exception {
         countServerDownLatch = new CountDownLatch(1);
 
         ManualClientEnd.start("http://localhost:" + port + "/");
-        countServerDownLatch.await();
+        countServerDownLatch.await(45, TimeUnit.SECONDS);
 
         assertNotNull(ManualClientEnd.result(NotificationScheme.MANUAL_ALL_NOTIFICATIONS));
         assertTrue("Manual".equalsIgnoreCase(recvMsg));
@@ -110,6 +108,6 @@
 
     @AfterClass
     public static void successTests() throws InterruptedException, IOException {
-        ws.stop();
+        ws.stop(30000);
     }
 }