Incorporate Liam code review
Change-Id: I387a54b8bd8c619c521c512258928669a8c1b791
Issue-ID: POLICY-756
Signed-off-by: pa834y <pa834y@att.com>
diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
index 9f9dc37..00c8c5f 100644
--- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
+++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientEnd.java
@@ -21,7 +21,9 @@
package org.onap.policy.std;
import java.net.URI;
-import javax.websocket.ClientEndpoint;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.onap.policy.api.NotificationHandler;
@@ -32,7 +34,6 @@
import org.onap.policy.common.logging.flexlogger.Logger;
import org.onap.policy.xacml.api.XACMLErrorConstants;
-@ClientEndpoint
public class AutoClientEnd extends WebSocketClient {
private static StdPDPNotification notification = null;
private static StdPDPNotification oldNotification = null;
@@ -42,8 +43,9 @@
private static String url = null;
private static boolean status = false;
private static boolean stop = false;
- private static boolean message = false;
private static boolean error = false;
+ private static boolean restartNeeded = false;
+ private static ScheduledExecutorService restartExecutorService = null;
private static Logger logger = FlexLogger.getLogger(AutoClientEnd.class.getName());
private AutoClientEnd(URI serverUri) {
@@ -53,7 +55,6 @@
@Override
public void onMessage(String msg) {
logger.info("Received Auto Notification from : " + getURI() + ", Notification: " + msg);
- AutoClientEnd.message = true;
try {
AutoClientEnd.notification = NotificationUnMarshal.notificationJSON(msg);
} catch (Exception e) {
@@ -68,45 +69,38 @@
AutoClientEnd.oldNotification = AutoClientEnd.notification;
callHandler();
}
-
- AutoClientEnd.message = false;
}
@Override
public void onClose(int code, String reason, boolean remote) {
logger.info("AutoClientEnd disconnected from: " + getURI() + "; Code: " + code + ", reason : " + reason);
- if (!AutoClientEnd.stop && !AutoClientEnd.message) {
- // This Block of code is executed if there is any Network Failure or
- // if the Notification is Down.
- logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + "Disconnected from Notification Server");
- AutoClientEnd.client = null;
- AutoClientEnd.status = false;
- // Try to connect Back to available PDP.
- AutoClientEnd.error = true;
- start(url);
- }
- AutoClientEnd.message = false;
+ AutoClientEnd.restartNeeded = true;
}
@Override
public void onError(Exception ex) {
logger.error("XACMLErrorConstants.ERROR_PROCESS_FLOW + Error connecting to: " + getURI()
+ ", Exception occured ...\n" + ex);
- // trying to Restart by self.
- stop();
- if (AutoClientEnd.url != null) {
- AutoClientEnd.client = null;
- AutoClientEnd.status = false;
- AutoClientEnd.error = true;
- AutoClientEnd.start(AutoClientEnd.url);
- }
+ AutoClientEnd.restartNeeded = true;
}
@Override
public void onOpen(ServerHandshake arg0) {
+ restartNeeded = false;
logger.info("Auto Notification Session Started... " + getURI());
}
+ private static void restart() {
+ try {
+ if (client != null && restartNeeded && !stop) {
+ logger.info("Auto Notification Session Restarting ... " + getUrl());
+ client.reconnect();
+ }
+ } catch (Exception e) {
+ logger.info("Auto Notification Session Error Started... " + getUrl());
+ }
+ }
+
/**
* Sets the auto.
*
@@ -156,6 +150,10 @@
client = new AutoClientEnd(new URI(url + "notifications"));
client.connect();
status = true;
+ restartExecutorService = Executors.newSingleThreadScheduledExecutor();
+ Runnable task = AutoClientEnd::restart;
+ restartExecutorService.scheduleAtFixedRate(task, 60, 60, TimeUnit.SECONDS);
+
if (error) {
// will not trigger. leave it in to later add checks
// The URL's will be in Sync according to design Spec.
@@ -171,7 +169,6 @@
}
} catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e);
- client = null;
status = false;
changeUrl();
}
@@ -179,6 +176,7 @@
private static void changeUrl() {
// Change the PDP if it is not Up.
+ stop();
StdPolicyEngine.rotatePDPList();
start(StdPolicyEngine.getPDPURL());
}
@@ -192,15 +190,25 @@
}
logger.info("\n Closing Auto Notification WebSocket Connection.. ");
stop = true;
+ // first stop the restart service
+ try {
+ restartExecutorService.shutdown();
+ } catch (Exception e1) {
+ logger.info("\n AutoClientEnd: Error stoppping the restart Scheduler ");
+ }
+
+ // close the connection
try {
client.closeBlocking();
- } catch (InterruptedException e) {
- logger.info("\n Error Closing Auto Notification WebSocket Connection.. InterruptedException");
+ } catch (Exception e) {
+ logger.error("\n ERROR Closing Auto Notification WebSocket Connection.. ");
}
+
logger.info("\n Closed the Auto Notification WebSocket Connection.. ");
client = null;
status = false;
stop = false;
+ restartNeeded = false;
}
private static void callHandler() {
diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
index 2fe6dc0..a67b540 100644
--- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
+++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/ManualClientEnd.java
@@ -22,7 +22,6 @@
import java.net.URI;
import java.util.concurrent.CountDownLatch;
-import javax.websocket.ClientEndpoint;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.onap.policy.api.NotificationScheme;
@@ -33,7 +32,6 @@
import org.onap.policy.std.StdPDPNotification;
import org.onap.policy.xacml.api.XACMLErrorConstants;
-@ClientEndpoint
public class ManualClientEnd extends WebSocketClient {
private static CountDownLatch latch;
private static StdPDPNotification notification = null;
@@ -63,12 +61,11 @@
logger.info("Manual Notification Recieved Message from : " + getURI() + ", Notification: " + message);
ManualClientEnd.resultJson = message;
try {
- ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message);
- latch.countDown();
+ ManualClientEnd.notification = NotificationUnMarshal.notificationJSON(message);
} catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
- latch.countDown();
}
+ latch.countDown();
}
@Override
@@ -95,7 +92,7 @@
client = new ManualClientEnd(new URI(url + "notifications"));
client.connect();
latch.await();
- client.close();
+ client.closeBlocking();
} catch (Exception e) {
logger.error(XACMLErrorConstants.ERROR_SYSTEM_ERROR + e);
}
diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
index 4f1ce6f..5056fce 100644
--- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
+++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/AutoClientEndTest.java
@@ -21,12 +21,11 @@
package org.onap.policy.std.test;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
@@ -37,7 +36,6 @@
import org.onap.policy.api.NotificationScheme;
import org.onap.policy.api.PDPNotification;
import org.onap.policy.std.AutoClientEnd;
-import org.onap.policy.std.StdPDPNotification;
import org.springframework.util.SocketUtils;
/**
@@ -47,9 +45,9 @@
public class AutoClientEndTest {
private static WebSocketServer ws;
- private static int port = 18080;
+ private static int port = SocketUtils.findAvailableTcpPort();
private static CountDownLatch countServerDownLatch = null;
- private StdPDPNotification notification = null;
+ private static PDPNotification notification = null;
/**
* Start server.
@@ -58,8 +56,8 @@
*/
@BeforeClass
public static void startServer() throws Exception {
- port = SocketUtils.findAvailableTcpPort();
- ws = new WebSocketServer(new InetSocketAddress(port), 16) {
+ notification = null;
+ ws = new WebSocketServer(new InetSocketAddress(port), 1) {
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
conn.send("{\"removedPolicies\": [],\"loadedPolicies\": "
@@ -91,7 +89,7 @@
};
- ws.setConnectionLostTimeout(30);
+ ws.setConnectionLostTimeout(0);
ws.start();
}
@@ -102,8 +100,8 @@
NotificationHandler handler = new NotificationHandler() {
@Override
- public void notificationReceived(PDPNotification notifi) {
- notification = (StdPDPNotification) notifi;
+ public void notificationReceived(PDPNotification notify) {
+ notification = notify;
countServerDownLatch.countDown();
}
@@ -113,17 +111,26 @@
countServerDownLatch = new CountDownLatch(1);
AutoClientEnd.start("http://localhost:" + port + "/");
- countServerDownLatch.await();
+ countServerDownLatch.await(45, TimeUnit.SECONDS);
assertNotNull(notification);
- assertTrue(AutoClientEnd.getStatus());
+
+
+ // simulate a server restart and verify client reconnects
+ countServerDownLatch = new CountDownLatch(1);
+ ws.stop(30000);
+ startServer();
+ countServerDownLatch.await(60+10, TimeUnit.SECONDS);
+ assertNotNull(notification);
+
+ AutoClientEnd.stop();
+
}
@AfterClass
- public static void successTests() throws InterruptedException, IOException {
- AutoClientEnd.stop();
- ws.stop();
+ public static void stopServer() throws InterruptedException, IOException {
+ ws.stop(30000);
}
diff --git a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
index 4a09164..252fa7e 100644
--- a/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
+++ b/PolicyEngineAPI/src/test/java/org/onap/policy/std/test/ManualClientEndTest.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
@@ -40,13 +41,11 @@
/**
* The class <code>ManualClientEndTest</code> contains tests for the class <code>{@link ManualClientEnd}</code>.
*
- * @generatedBy CodePro at 6/1/16 1:41 PM
- * @version $Revision: 1.0 $
*/
public class ManualClientEndTest {
private static WebSocketServer ws;
- private static int port = 18080;
+ private static int port = SocketUtils.findAvailableTcpPort();
private static CountDownLatch countServerDownLatch = null;
private static String recvMsg = null;
@@ -57,8 +56,7 @@
*/
@BeforeClass
public static void startServer() throws Exception {
- port = SocketUtils.findAvailableTcpPort();
- ws = new WebSocketServer(new InetSocketAddress(port), 16) {
+ ws = new WebSocketServer(new InetSocketAddress(port), 1) {
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
@@ -93,16 +91,16 @@
};
- ws.setConnectionLostTimeout(30);
+ ws.setConnectionLostTimeout(0);
ws.start();
}
@Test
- public void testAutoClient() throws Exception {
+ public void testManualClient() throws Exception {
countServerDownLatch = new CountDownLatch(1);
ManualClientEnd.start("http://localhost:" + port + "/");
- countServerDownLatch.await();
+ countServerDownLatch.await(45, TimeUnit.SECONDS);
assertNotNull(ManualClientEnd.result(NotificationScheme.MANUAL_ALL_NOTIFICATIONS));
assertTrue("Manual".equalsIgnoreCase(recvMsg));
@@ -110,6 +108,6 @@
@AfterClass
public static void successTests() throws InterruptedException, IOException {
- ws.stop();
+ ws.stop(30000);
}
}