Merge "Fix sonars in server-pool"
diff --git a/feature-server-pool/pom.xml b/feature-server-pool/pom.xml
index 442544c..778a9cd 100644
--- a/feature-server-pool/pom.xml
+++ b/feature-server-pool/pom.xml
@@ -59,7 +59,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <version>3.1.1</version>
<executions>
<execution>
<id>copy-dependencies</id>
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
index b82f2e1..6241a29 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
@@ -360,63 +360,31 @@
case OWNER_NULL:
// <OWNER_NULL> -- owner UUID should be set to 'null'
- if (bucket.getOwner() != null) {
- logger.info("Bucket {} owner: {}->null",
- index, bucket.getOwner());
- bucketChanges = true;
- synchronized (bucket) {
- bucket.setOwner(null);
- bucket.setState(null);
- }
- }
+ bucketChanges = nullifyOwner(index, bucket, bucketChanges);
break;
case PRIMARY_BACKUP_UPDATE:
// <PRIMARY_BACKUP_UPDATE> <primary-backup-uuid> --
// primary backup UUID specified
- Server newPrimaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.primaryBackup != newPrimaryBackup) {
- logger.info("Bucket {} primary backup: {}->{}", index,
- bucket.primaryBackup, newPrimaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = newPrimaryBackup;
- }
+ bucketChanges = updatePrimaryBackup(dis, index, bucket, bucketChanges);
break;
case PRIMARY_BACKUP_NULL:
// <PRIMARY_BACKUP_NULL> --
// primary backup should be set to 'null'
- if (bucket.primaryBackup != null) {
- logger.info("Bucket {} primary backup: {}->null",
- index, bucket.primaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = null;
- }
+ bucketChanges = nullifyPrimaryBackup(index, bucket, bucketChanges);
break;
case SECONDARY_BACKUP_UPDATE:
// <SECONDARY_BACKUP_UPDATE> <secondary-backup-uuid> --
// secondary backup UUID specified
- Server newSecondaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.secondaryBackup != newSecondaryBackup) {
- logger.info("Bucket {} secondary backup: {}->{}", index,
- bucket.secondaryBackup, newSecondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = newSecondaryBackup;
- }
+ bucketChanges = updateSecondaryBackup(dis, index, bucket, bucketChanges);
break;
case SECONDARY_BACKUP_NULL:
// <SECONDARY_BACKUP_NULL> --
// secondary backup should be set to 'null'
- if (bucket.secondaryBackup != null) {
- logger.info("Bucket {} secondary backup: {}->null",
- index, bucket.secondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = null;
- }
+ bucketChanges = nullifySecondaryBackup(index, bucket, bucketChanges);
break;
default:
@@ -433,6 +401,65 @@
return changes;
}
+ private static boolean nullifyOwner(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.getOwner() != null) {
+ logger.info("Bucket {} owner: {}->null",
+ index, bucket.getOwner());
+ bucketChanges = true;
+ synchronized (bucket) {
+ bucket.setOwner(null);
+ bucket.setState(null);
+ }
+ }
+ return bucketChanges;
+ }
+
+ private static boolean updatePrimaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
+ throws IOException {
+ Server newPrimaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.primaryBackup != newPrimaryBackup) {
+ logger.info("Bucket {} primary backup: {}->{}", index,
+ bucket.primaryBackup, newPrimaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = newPrimaryBackup;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean nullifyPrimaryBackup(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.primaryBackup != null) {
+ logger.info("Bucket {} primary backup: {}->null",
+ index, bucket.primaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = null;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean updateSecondaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
+ throws IOException {
+ Server newSecondaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.secondaryBackup != newSecondaryBackup) {
+ logger.info("Bucket {} secondary backup: {}->{}", index,
+ bucket.secondaryBackup, newSecondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = newSecondaryBackup;
+ }
+ return bucketChanges;
+ }
+
+ private static boolean nullifySecondaryBackup(int index, Bucket bucket, boolean bucketChanges) {
+ if (bucket.secondaryBackup != null) {
+ logger.info("Bucket {} secondary backup: {}->null",
+ index, bucket.secondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = null;
+ }
+ return bucketChanges;
+ }
+
/**
* Update bucket owner information.
*
@@ -768,19 +795,7 @@
out.println("Moving bucket " + bucketNumber + " from "
+ oldHost + " to " + newHost);
- /*
- * update the owner, and ensure that the primary and secondary backup
- * remain different from the owner.
- */
- bucket.setOwner(newHost);
- if (newHost == bucket.primaryBackup) {
- out.println("Moving primary back from " + newHost + " to " + oldHost);
- bucket.setPrimaryBackup(oldHost);
- } else if (newHost == bucket.secondaryBackup) {
- out.println("Moving secondary back from " + newHost
- + " to " + oldHost);
- bucket.setSecondaryBackup(oldHost);
- }
+ updateOwner(out, bucket, oldHost, newHost);
try {
/*
@@ -796,6 +811,22 @@
}
}
+ private static void updateOwner(PrintStream out, TestBucket bucket, TestServer oldHost, TestServer newHost) {
+ /*
+ * update the owner, and ensure that the primary and secondary backup
+ * remain different from the owner.
+ */
+ bucket.setOwner(newHost);
+ if (newHost == bucket.primaryBackup) {
+ out.println("Moving primary back from " + newHost + " to " + oldHost);
+ bucket.setPrimaryBackup(oldHost);
+ } else if (newHost == bucket.secondaryBackup) {
+ out.println("Moving secondary back from " + newHost
+ + " to " + oldHost);
+ bucket.setSecondaryBackup(oldHost);
+ }
+ }
+
/**
* This method is called when an incoming /bucket/sessionData message is
* received from the old owner of the bucket, which presumably means that
@@ -1485,6 +1516,10 @@
return;
}
+ makeTestBucket(bucketSnapshot);
+ }
+
+ private void makeTestBucket(final Bucket[] bucketSnapshot) {
/*
* Now, create a 'TestBucket' table that mirrors the 'Bucket' table.
* Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer'
@@ -1796,28 +1831,13 @@
dos.writeShort(i);
// 'owner' field
- if (newOwner != null) {
- dos.writeByte(OWNER_UPDATE);
- Util.writeUuid(dos, newOwner);
- } else {
- dos.writeByte(OWNER_NULL);
- }
+ writeOwner(dos, newOwner);
// 'primaryBackup' field
- if (newPrimary != null) {
- dos.writeByte(PRIMARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newPrimary);
- } else {
- dos.writeByte(PRIMARY_BACKUP_NULL);
- }
+ writePrimary(dos, newPrimary);
// 'secondaryBackup' field
- if (newSecondary != null) {
- dos.writeByte(SECONDARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newSecondary);
- } else {
- dos.writeByte(SECONDARY_BACKUP_NULL);
- }
+ writeSecondary(dos, newSecondary);
dos.writeByte(END_OF_PARAMETERS_TAG);
}
@@ -1851,6 +1871,33 @@
MainLoop.queueWork(task);
}
+ private void writeOwner(DataOutputStream dos, UUID newOwner) throws IOException {
+ if (newOwner != null) {
+ dos.writeByte(OWNER_UPDATE);
+ Util.writeUuid(dos, newOwner);
+ } else {
+ dos.writeByte(OWNER_NULL);
+ }
+ }
+
+ private void writePrimary(DataOutputStream dos, UUID newPrimary) throws IOException {
+ if (newPrimary != null) {
+ dos.writeByte(PRIMARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newPrimary);
+ } else {
+ dos.writeByte(PRIMARY_BACKUP_NULL);
+ }
+ }
+
+ private void writeSecondary(DataOutputStream dos, UUID newSecondary) throws IOException {
+ if (newSecondary != null) {
+ dos.writeByte(SECONDARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newSecondary);
+ } else {
+ dos.writeByte(SECONDARY_BACKUP_NULL);
+ }
+ }
+
/**
* Supports the '/cmd/dumpBuckets' REST message -- this isn't part of
* a 'rebalance' operation, but it turned out to be a convenient way
@@ -2304,22 +2351,7 @@
message = messages.poll();
if (message == null) {
// no messages left
- if (state == this) {
- if (owner == Server.getThisServer()) {
- // we can now exit the state
- state = null;
- stateChanged();
- } else {
- /*
- * We need a grace period before we can
- * remove the 'state' value (this can happen
- * if we receive and process the bulk data
- * before receiving official confirmation
- * that we are owner of the bucket.
- */
- messages = null;
- }
- }
+ noMoreMessages();
break;
}
}
@@ -2330,25 +2362,48 @@
}
if (messages == null) {
// this indicates we need to enter a grace period before cleanup,
- try {
- logger.info("{}: entering grace period before terminating",
- this);
- Thread.sleep(unconfirmedGracePeriod);
- } catch (InterruptedException e) {
- // we are exiting in any case
- Thread.currentThread().interrupt();
- } finally {
- synchronized (Bucket.this) {
- // Do we need to confirm that we really are the owner?
- // What does it mean if we are not?
- if (state == this) {
- state = null;
- stateChanged();
- }
+ sleepBeforeCleanup();
+ }
+ logger.info("{}: exiting cleanup state", this);
+ }
+
+ private void noMoreMessages() {
+ if (state == this) {
+ if (owner == Server.getThisServer()) {
+ // we can now exit the state
+ state = null;
+ stateChanged();
+ } else {
+ /*
+ * We need a grace period before we can
+ * remove the 'state' value (this can happen
+ * if we receive and process the bulk data
+ * before receiving official confirmation
+ * that we are owner of the bucket.
+ */
+ messages = null;
+ }
+ }
+ }
+
+ private void sleepBeforeCleanup() {
+ try {
+ logger.info("{}: entering grace period before terminating",
+ this);
+ Thread.sleep(unconfirmedGracePeriod);
+ } catch (InterruptedException e) {
+ // we are exiting in any case
+ Thread.currentThread().interrupt();
+ } finally {
+ synchronized (Bucket.this) {
+ // Do we need to confirm that we really are the owner?
+ // What does it mean if we are not?
+ if (state == this) {
+ state = null;
+ stateChanged();
}
}
}
- logger.info("{}: exiting cleanup state", this);
}
/**
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
index 2d643a3..a53fb4d 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
@@ -180,14 +180,14 @@
* same format base64-encoded message that 'Server'
* instances periodically exchange
*/
- LinkedHashMap<String, String> map = new LinkedHashMap<>();
try {
- map = coder.decode(event, LinkedHashMap.class);
+ @SuppressWarnings("unchecked")
+ LinkedHashMap<String, String> map = coder.decode(event, LinkedHashMap.class);
String message = map.get("pingData");
Server.adminRequest(message.getBytes(StandardCharsets.UTF_8));
logger.info("Received a message, server count={}", Server.getServerCount());
} catch (CoderException e) {
- logger.error("Can't decode message: {}", e);
+ logger.error("Can't decode message", e);
}
}
@@ -332,6 +332,7 @@
publisher.send(jsonString);
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
logger.error("Exception in Discovery.Publisher.run():", e);
return;
} catch (Exception e) {
@@ -340,6 +341,7 @@
try {
Thread.sleep(15000);
} catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
logger.error("Discovery.Publisher sleep interrupted");
}
return;
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
index dd1c7c3..5f93d2a 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
@@ -168,17 +168,25 @@
logger.info("Starting FeatureServerPool");
Server.startup(CONFIG_FILE);
TargetLock.startup();
- droolsTimeoutMillis =
- getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT);
+ setDroolsTimeoutMillis(
+ getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT));
int intTimeToLive =
getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
- timeToLiveSecond = String.valueOf(intTimeToLive);
+ setTimeToLiveSecond(String.valueOf(intTimeToLive));
buildKeywordTable();
Bucket.Backup.register(new DroolsSessionBackup());
Bucket.Backup.register(new TargetLock.LockBackup());
return false;
}
+ private static void setDroolsTimeoutMillis(long timeoutMs) {
+ droolsTimeoutMillis = timeoutMs;
+ }
+
+ private static void setTimeToLiveSecond(String ttlSec) {
+ timeToLiveSecond = ttlSec;
+ }
+
/**
* {@inheritDoc}
*/
@@ -319,7 +327,7 @@
path = Arrays.copyOf(path, path.length);
path[path.length - 1] = fieldName;
}
- keyword = sco.getString(path);
+ keyword = sco.getString((Object[]) path);
if (keyword != null) {
if (conversionFunctionName != null) {
@@ -554,21 +562,11 @@
continue;
}
- int beginIndex = begin.length();
- int endIndex = name.length() - end.length();
- if (beginIndex < endIndex) {
- // <topic> is specified, so this table is limited to this
- // specific topic
- topic = name.substring(beginIndex, endIndex);
- }
+ topic = detmTopic(name, begin, end);
// now, process the value
// Example: requestID,CommonHeader.RequestID
- String[] commaSeparatedEntries = prop.getProperty(name).split(",");
- String[][] paths = new String[commaSeparatedEntries.length][];
- for (int i = 0; i < commaSeparatedEntries.length; i += 1) {
- paths[i] = commaSeparatedEntries[i].split("\\.");
- }
+ String[][] paths = splitPaths(prop, name);
if (topic == null) {
// these paths are used for any topics not explicitly
@@ -581,6 +579,28 @@
}
}
+ private static String detmTopic(String name, String begin, String end) {
+ int beginIndex = begin.length();
+ int endIndex = name.length() - end.length();
+ if (beginIndex < endIndex) {
+ // <topic> is specified, so this table is limited to this
+ // specific topic
+ return name.substring(beginIndex, endIndex);
+ }
+
+ return null;
+ }
+
+ private static String[][] splitPaths(Properties prop, String name) {
+ String[] commaSeparatedEntries = prop.getProperty(name).split(",");
+ String[][] paths = new String[commaSeparatedEntries.length][];
+ for (int i = 0; i < commaSeparatedEntries.length; i += 1) {
+ paths[i] = commaSeparatedEntries[i].split("\\.");
+ }
+
+ return paths;
+ }
+
/*======================================*/
/* 'DroolsPdpStateControlApi' interface */
/*======================================*/
@@ -993,8 +1013,10 @@
}
};
kieSession.insert(doRestore);
+ ois.close();
return sessionLatch;
} else {
+ ois.close();
logger.error("{}: Invalid session data for session={}, type={}",
this, session.getFullName(), obj.getClass().getName());
}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
index c9f4c78..0059a4b 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
@@ -26,7 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
+import java.util.function.UnaryOperator;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,33 +183,26 @@
* and all the way up the superclass chain.
*/
private static Class<?> buildReflectiveLookupFindKeyClass(Class<?> clazz) {
- Class<?> keyClass = null;
for (Class<?> cl = clazz; cl != null; cl = cl.getSuperclass()) {
if (classNameToSequence.containsKey(cl.getName())) {
// matches the class
- keyClass = cl;
- break;
+ return cl;
}
for (Class<?> intf : cl.getInterfaces()) {
if (classNameToSequence.containsKey(intf.getName())) {
// matches one of the interfaces
- keyClass = intf;
- break;
+ return intf;
}
// interface can have superclass
for (Class<?> cla = clazz; cla != null; cla = intf.getSuperclass()) {
if (classNameToSequence.containsKey(cla.getName())) {
// matches the class
- keyClass = cla;
- break;
+ return cla;
}
}
}
- if (keyClass != null) {
- break;
- }
}
- return keyClass;
+ return null;
}
private static Lookup buildReflectiveLookupBuild(Class<?> clazz, Class<?> keyClass) {
@@ -438,7 +431,7 @@
*/
// used to lookup optional conversion functions
- private static Map<String, Function<String, String>> conversionFunction =
+ private static Map<String, UnaryOperator<String>> conversionFunction =
new ConcurrentHashMap<>();
// conversion function 'uuid':
@@ -459,7 +452,7 @@
* @param name the conversion function name
* @param function the object that does the transformation
*/
- public static void addConversionFunction(String name, Function<String, String> function) {
+ public static void addConversionFunction(String name, UnaryOperator<String> function) {
conversionFunction.put(name, function);
}
@@ -478,7 +471,7 @@
}
// look up the function
- Function<String, String> function = conversionFunction.get(functionName);
+ UnaryOperator<String> function = conversionFunction.get(functionName);
if (function == null) {
logger.error("{}: conversion function not found", functionName);
return null;
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
index 96b6598..1a59645 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
@@ -41,6 +41,7 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import lombok.EqualsAndHashCode;
+import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +65,11 @@
}
// Server currently in the leader roll
+ @Setter
private static Server leaderLocal = null;
// Vote state machine -- it is null, unless a vote is in progress
+ @Setter
private static VoteCycle voteCycle = null;
private static UUID emptyUUID = new UUID(0L, 0L);
@@ -172,7 +175,7 @@
if (server == leaderLocal) {
// the lead server has failed --
// start/restart the VoteCycle state machine
- leaderLocal = null;
+ setLeaderLocal(null);
startVoting();
// send out a notification that the lead server has failed
@@ -296,7 +299,7 @@
// 5 second grace period has passed -- the leader is one with
// the most votes, which is the first entry in 'voteData'
Server oldLeader = leaderLocal;
- leaderLocal = Server.getServer(voteData.first().uuid);
+ setLeaderLocal(Server.getServer(voteData.first().uuid));
if (leaderLocal != oldLeader) {
// the leader has changed -- send out notifications
for (Events listener : Events.getListeners()) {
@@ -319,7 +322,7 @@
// we are done with voting -- clean up, and report results
MainLoop.removeBackgroundWork(this);
- voteCycle = null;
+ setVoteCycle(null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bos);
@@ -332,6 +335,11 @@
out.format(format, "UUID", "Votes", "Voter(s)");
out.format(format, "----", "-----", "--------");
+ outputVotes(out, format);
+ logger.info("Output - {}", bos);
+ }
+
+ private void outputVotes(PrintStream out, String format) {
for (VoteData vote : voteData) {
if (vote.voters.isEmpty()) {
out.format(format, vote.uuid, 0, "");
@@ -348,7 +356,6 @@
}
}
}
- logger.info("Output - {}", bos);
}
/**
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
index d310805..80b5891 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
@@ -81,6 +81,7 @@
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import lombok.Setter;
import org.glassfish.jersey.client.ClientProperties;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
import org.onap.policy.common.endpoints.http.client.HttpClient;
@@ -105,6 +106,7 @@
new TreeMap<>(Util.uuidComparator);
// subset of servers to be notified (null means it needs to be rebuilt)
+ @Setter
private static LinkedList<Server> notifyList = null;
// data to be sent out to notify list
@@ -320,7 +322,6 @@
InetAddress address = InetAddress.getByName(ipAddressString);
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
- possibleError = "HTTP server initialization error";
restServer = HttpServletServerFactoryInstance.getServerFactory().build(
"SERVER-POOL", // name
useHttps, // https
@@ -342,7 +343,6 @@
}
// we may not know the port until after the server is started
- possibleError = "HTTP server start error";
restServer.start();
possibleError = null;
@@ -625,7 +625,7 @@
updatedList.add(this);
// notify list will need to be rebuilt
- notifyList = null;
+ setNotifyList(null);
if (socketAddress != null && this != thisServer) {
// initialize 'client' and 'target' fields
@@ -769,60 +769,63 @@
// The 'notifyList' value is initially 'null', and it is reset to 'null'
// every time a new host joins, or one leaves. That way, it is marked for
// recalculation, but only when needed.
- if (notifyList == null) {
- // next index we are looking for
- int dest = 1;
+ if (notifyList != null) {
+ return notifyList;
+ }
- // our current position in the Server table -- starting at 'thisServer'
- UUID current = thisServer.uuid;
+ // next index we are looking for
+ int dest = 1;
- // site socket address of 'current'
- InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
+ // our current position in the Server table -- starting at 'thisServer'
+ UUID current = thisServer.uuid;
- // hash set of all site socket addresses located
- HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
- siteSocketAddresses.add(thisSiteSocketAddress);
+ // site socket address of 'current'
+ InetSocketAddress thisSiteSocketAddress = thisServer.siteSocketAddress;
- // the list we are building
- notifyList = new LinkedList<>();
+ // hash set of all site socket addresses located
+ HashSet<InetSocketAddress> siteSocketAddresses = new HashSet<>();
+ siteSocketAddresses.add(thisSiteSocketAddress);
- int index = 1;
- for ( ; ; ) {
- // move to the next key (UUID) -- if we hit the end of the table,
- // wrap to the beginning
- current = servers.higherKey(current);
- if (current == null) {
- current = servers.firstKey();
- }
- if (current.equals(thisServer.uuid)) {
- // we have looped through the entire list
- break;
- }
+ // the list we are building
+ notifyList = new LinkedList<>();
- // fetch associated server & site socket address
- Server server = servers.get(current);
- InetSocketAddress currentSiteSocketAddress =
- server.siteSocketAddress;
+ int index = 1;
+ for ( ; ; ) {
+ // move to the next key (UUID) -- if we hit the end of the table,
+ // wrap to the beginning
+ current = servers.higherKey(current);
+ if (current == null) {
+ current = servers.firstKey();
+ }
+ if (current.equals(thisServer.uuid)) {
+ // we have looped through the entire list
+ break;
+ }
- if (Objects.equals(thisSiteSocketAddress,
- currentSiteSocketAddress)) {
- // same site -- see if we should add this one
- if (index == dest) {
- // this is the next index we are looking for --
- // add the server
- notifyList.add(server);
+ // fetch associated server & site socket address
+ Server server = servers.get(current);
+ InetSocketAddress currentSiteSocketAddress =
+ server.siteSocketAddress;
- // advance to the next offset (current-offset * 2)
- dest = dest << 1;
- }
- index += 1;
- } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
- // we need at least one member from each site
+ if (Objects.equals(thisSiteSocketAddress,
+ currentSiteSocketAddress)) {
+ // same site -- see if we should add this one
+ if (index == dest) {
+ // this is the next index we are looking for --
+ // add the server
notifyList.add(server);
- siteSocketAddresses.add(currentSiteSocketAddress);
+
+ // advance to the next offset (current-offset * 2)
+ dest = dest << 1;
}
+ index += 1;
+ } else if (!siteSocketAddresses.contains(currentSiteSocketAddress)) {
+ // we need at least one member from each site
+ notifyList.add(server);
+ siteSocketAddresses.add(currentSiteSocketAddress);
}
}
+
return notifyList;
}
@@ -932,27 +935,8 @@
* This method is running within the 'MainLoop' thread.
*/
try {
- WebTarget webTarget = target.path(path);
- if (responseCallback != null) {
- // give callback a chance to modify 'WebTarget'
- webTarget = responseCallback.webTarget(webTarget);
+ invokeWebTarget(path, entity, responseCallback);
- // send the response to the callback
- Response response;
- if (entity == null) {
- response = webTarget.request().get();
- } else {
- response = webTarget.request().post(entity);
- }
- responseCallback.response(response);
- } else {
- // just do the invoke, and ignore the response
- if (entity == null) {
- webTarget.request().get();
- } else {
- webTarget.request().post(entity);
- }
- }
} catch (Exception e) {
logger.error("Failed to send to {} ({}, {})",
uuid, destSocketAddress, destName);
@@ -968,6 +952,30 @@
});
}
+ private void invokeWebTarget(final String path, final Entity<?> entity, PostResponse responseCallback) {
+ WebTarget webTarget = target.path(path);
+ if (responseCallback != null) {
+ // give callback a chance to modify 'WebTarget'
+ webTarget = responseCallback.webTarget(webTarget);
+
+ // send the response to the callback
+ Response response;
+ if (entity == null) {
+ response = webTarget.request().get();
+ } else {
+ response = webTarget.request().post(entity);
+ }
+ responseCallback.response(response);
+ } else {
+ // just do the invoke, and ignore the response
+ if (entity == null) {
+ webTarget.request().get();
+ } else {
+ webTarget.request().post(entity);
+ }
+ }
+ }
+
/**
* This method may be invoked from any thread.
*
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
index 1637e9e..470801e 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
@@ -83,6 +83,8 @@
* owned by the host containing the entry.
*/
public class TargetLock implements Lock, Serializable {
+ private static final String LOCK_MSG = "(key={},owner={},uuid={},ttl={})";
+
private static final long serialVersionUID = 1L;
private static Logger logger = LoggerFactory.getLogger(TargetLock.class);
@@ -550,7 +552,7 @@
WebTarget webTarget = server.getWebTarget("lock/free");
if (webTarget != null) {
logger.warn("Forwarding 'lock/free' to uuid {} "
- + "(key={},owner={},uuid={},ttl={})",
+ + LOCK_MSG,
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
.queryParam(QP_KEY, key)
@@ -565,7 +567,7 @@
// if we reach this point, we didn't forward for some reason --
// return failure by indicating it is locked and unavailable
logger.error("Couldn't forward 'lock/free' "
- + "(key={},owner={},uuid={},ttl={})",
+ + LOCK_MSG,
key, ownerKey, uuid, ttl);
return null;
}
@@ -599,7 +601,7 @@
WebTarget webTarget = server.getWebTarget("lock/locked");
if (webTarget != null) {
logger.warn("Forwarding 'lock/locked' to uuid {} "
- + "(key={},owner={},uuid={},ttl={})",
+ + LOCK_MSG,
server.getUuid(), key, ownerKey, uuid, ttl);
return webTarget
.queryParam(QP_KEY, key)
@@ -614,42 +616,15 @@
// if we reach this point, we didn't forward for some reason --
// return failure by indicating it is locked and unavailable
logger.error("Couldn't forward 'lock/locked' "
- + "(key={},owner={},uuid={},ttl={})",
+ + LOCK_MSG,
key, ownerKey, uuid, ttl);
return Response.noContent().status(LOCKED).build();
}
- TargetLock targetLock = null;
+ TargetLock targetLock;
LocalLocks localLocks = LocalLocks.get(ownerKey);
synchronized (localLocks) {
- WeakReference<TargetLock> wr =
- localLocks.uuidToWeakReference.get(uuid);
-
- if (wr != null) {
- targetLock = wr.get();
- if (targetLock == null) {
- // lock has been abandoned
- // (AbandonedHandler should usually find this first)
- localLocks.weakReferenceToIdentity.remove(wr);
- localLocks.uuidToWeakReference.remove(uuid);
- } else {
- // the lock has been made available -- update the state
- // TBD: This could be outside of 'synchronized (localLocks)'
- synchronized (targetLock) {
- if (targetLock.state == State.WAITING) {
- targetLock.state = State.ACTIVE;
- } else {
- // will return a failure -- not sure how this happened
- logger.error("incomingLocked: {} is in state {}",
- targetLock, targetLock.state);
- targetLock = null;
- }
- }
- }
- } else {
- // clean up what we can
- localLocks.uuidToWeakReference.remove(uuid);
- }
+ targetLock = grabLock(uuid, localLocks);
}
if (targetLock == null) {
// We can't locate the target lock
@@ -661,6 +636,39 @@
}
}
+ private static TargetLock grabLock(UUID uuid, LocalLocks localLocks) {
+ WeakReference<TargetLock> wr =
+ localLocks.uuidToWeakReference.get(uuid);
+
+ if (wr != null) {
+ TargetLock targetLock = wr.get();
+ if (targetLock == null) {
+ // lock has been abandoned
+ // (AbandonedHandler should usually find this first)
+ localLocks.weakReferenceToIdentity.remove(wr);
+ localLocks.uuidToWeakReference.remove(uuid);
+ } else {
+ // the lock has been made available -- update the state
+ // TBD: This could be outside of 'synchronized (localLocks)'
+ synchronized (targetLock) {
+ if (targetLock.state == State.WAITING) {
+ targetLock.state = State.ACTIVE;
+ return targetLock;
+ } else {
+ // will return a failure -- not sure how this happened
+ logger.error("incomingLocked: {} is in state {}",
+ targetLock, targetLock.state);
+ }
+ }
+ }
+ } else {
+ // clean up what we can
+ localLocks.uuidToWeakReference.remove(uuid);
+ }
+
+ return null;
+ }
+
/**
* This is called when the state of a bucket has changed, but is currently
* stable. Note that this method is called while being synchronized on the
@@ -1778,6 +1786,12 @@
}
}
+ dumpMergeData(out);
+ dumpServerTable(out);
+ dumpClientOnlyEntries(out);
+ }
+
+ private void dumpMergeData(PrintStream out) {
if (detail) {
// generate format based upon the maximum key length, maximum
// owner key length, and whether comments are included anywhere
@@ -1801,9 +1815,6 @@
out.printf(format, "Key", "Owner Key", "UUID", "State", "Comments");
out.printf(format, "---", PRINTOUT_DASHES, "----", "-----", "--------");
}
-
- dumpServerTable(out);
- dumpClientOnlyEntries(out);
}
private void dumpServerTable(PrintStream out) {
@@ -2060,23 +2071,7 @@
clientDataList.add(clientData);
synchronized (localLocks) {
- for (WeakReference<TargetLock> wr :
- localLocks.weakReferenceToIdentity.keySet()) {
- // Note: 'targetLock' may be 'null' if it has
- // been abandoned, and garbage collected
- TargetLock targetLock = wr.get();
-
- // fetch associated 'identity'
- Identity identity =
- localLocks.weakReferenceToIdentity.get(wr);
- if (identity != null) {
- // add a new 'ClientDataRecord' for this bucket
- clientData.clientDataRecords.add(
- new ClientDataRecord(identity,
- (targetLock == null ? null :
- targetLock.getState())));
- }
- }
+ generateClientLockData(localLocks, clientData);
}
}
@@ -2089,6 +2084,26 @@
}
}
}
+
+ private void generateClientLockData(LocalLocks localLocks, ClientData clientData) {
+ for (WeakReference<TargetLock> wr :
+ localLocks.weakReferenceToIdentity.keySet()) {
+ // Note: 'targetLock' may be 'null' if it has
+ // been abandoned, and garbage collected
+ TargetLock targetLock = wr.get();
+
+ // fetch associated 'identity'
+ Identity identity =
+ localLocks.weakReferenceToIdentity.get(wr);
+ if (identity != null) {
+ // add a new 'ClientDataRecord' for this bucket
+ clientData.clientDataRecords.add(
+ new ClientDataRecord(identity,
+ (targetLock == null ? null :
+ targetLock.getState())));
+ }
+ }
+ }
}
/**
@@ -2223,28 +2238,8 @@
if (globalLocks != null) {
Map<String, LockEntry> keyToEntry = globalLocks.keyToEntry;
synchronized (keyToEntry) {
- LockEntry le = keyToEntry.get(identity.key);
- if (le != null) {
- if (identity.uuid.equals(le.currentOwnerUuid)
- && identity.ownerKey.equals(le.currentOwnerKey)) {
- // we found a match
- continue;
- }
-
- // check the waiting list
- boolean match = false;
- for (Waiting waiting : le.waitingList) {
- if (identity.uuid.equals(waiting.ownerUuid)
- && identity.ownerKey.equals(waiting.ownerKey)) {
- // we found a match on the waiting list
- match = true;
- break;
- }
- }
- if (match) {
- // there was a match on the waiting list
- continue;
- }
+ if (matchIdentity(keyToEntry, identity)) {
+ continue;
}
}
}
@@ -2265,6 +2260,28 @@
}
}
+ private boolean matchIdentity(Map<String, LockEntry> keyToEntry, Identity identity) {
+ LockEntry le = keyToEntry.get(identity.key);
+ if (le != null) {
+ if (identity.uuid.equals(le.currentOwnerUuid)
+ && identity.ownerKey.equals(le.currentOwnerKey)) {
+ // we found a match
+ return true;
+ }
+
+ // check the waiting list
+ for (Waiting waiting : le.waitingList) {
+ if (identity.uuid.equals(waiting.ownerUuid)
+ && identity.ownerKey.equals(waiting.ownerKey)) {
+ // we found a match on the waiting list
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
private void generateResponseServerEnd(AuditData response, boolean includeWarnings) {
for (Identity identity : serverData) {
// remote end is the server, and we are the client
@@ -2779,7 +2796,7 @@
// send new list to other end
response = server
- .getWebTarget("lock/audit")
+ .getWebTarget(LOCK_AUDIT)
.queryParam(QP_SERVER, server.getUuid().toString())
.queryParam(QP_TTL, timeToLive)
.request().post(entity);
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
index 48bd1c1..e8121f3 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
@@ -245,6 +245,11 @@
servers.add(bucket.getPrimaryBackup());
servers.add(bucket.getSecondaryBackup());
}
+ sendLocksToBackupServers(bucketNumber, entity, count, servers);
+ }
+
+ private static void sendLocksToBackupServers(final int bucketNumber, final Entity<String> entity, final int count,
+ Set<Server> servers) {
for (final Server server : servers) {
if (server != null) {
// send out REST command
@@ -336,25 +341,7 @@
servers.add(bucket.getPrimaryBackup());
servers.add(bucket.getSecondaryBackup());
}
- for (final Server server : servers) {
- if (server != null) {
- // send out REST command
- server.getThreadPool().execute(() -> {
- WebTarget webTarget =
- server.getWebTarget("persistence/session");
- if (webTarget != null) {
- webTarget
- .queryParam(QP_BUCKET,
- bucket.getIndex())
- .queryParam(QP_SESSION,
- encodedSessionName)
- .queryParam(QP_COUNT, count)
- .queryParam(QP_DEST, server.getUuid())
- .request().post(entity);
- }
- });
- }
- }
+ sendBucketToBackupServers(bucket, count, entity, servers);
}
}
} catch (Exception e) {
@@ -362,6 +349,30 @@
}
}
+ private void sendBucketToBackupServers(Bucket bucket, final int count, final Entity<String> entity,
+ Set<Server> servers) {
+
+ for (final Server server : servers) {
+ if (server != null) {
+ // send out REST command
+ server.getThreadPool().execute(() -> {
+ WebTarget webTarget =
+ server.getWebTarget("persistence/session");
+ if (webTarget != null) {
+ webTarget
+ .queryParam(QP_BUCKET,
+ bucket.getIndex())
+ .queryParam(QP_SESSION,
+ encodedSessionName)
+ .queryParam(QP_COUNT, count)
+ .queryParam(QP_DEST, server.getUuid())
+ .request().post(entity);
+ }
+ });
+ }
+ }
+ }
+
/* ************************************** */
/* 'RuleRuntimeEventListener' interface */
/* ************************************** */
@@ -690,57 +701,20 @@
String sessionName = entry.getKey();
ReceiverSessionBucketData rsbd = entry.getValue();
- // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
- String[] nameSegments = sessionName.split(":");
- PolicySession policySession = null;
-
- // locate the 'PolicyContainer' and 'PolicySession'
- if (nameSegments.length == 3) {
- // step through all 'PolicyContainer' instances looking
- // for a matching 'artifactId' & 'groupId'
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- if (nameSegments[1].equals(pc.getArtifactId())
- && nameSegments[0].equals(pc.getGroupId())) {
- // 'PolicyContainer' matches -- try to fetch the session
- policySession = pc.getPolicySession(nameSegments[2]);
- break;
- }
- }
- }
-
+ PolicySession policySession = detmPolicySession(sessionName);
if (policySession == null) {
logger.error(RESTORE_BUCKET_ERROR
+ "Can't find PolicySession{}", sessionName);
continue;
}
- Object obj = null;
- try {
- // deserialization needs to use the correct 'ClassLoader'
- obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
- policySession.getPolicyContainer().getClassLoader());
- } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
- logger.error(RESTORE_BUCKET_ERROR
- + "Failed to read data for session '{}'",
- sessionName, e);
-
- // can't decode -- skip this session
- continue;
- }
-
- if (!(obj instanceof Map)) {
- logger.error(RESTORE_BUCKET_ERROR
- + "Session '{}' data has class {}, expected 'Map'",
- sessionName, obj.getClass().getName());
-
- // wrong object type decoded -- skip this session
+ final Map<?, ?> droolsObjects = deserializeMap(sessionName, rsbd, policySession);
+ if (droolsObjects == null) {
continue;
}
// if we reach this point, we have decoded the persistent data
- final Map<?, ?> droolsObjects = (Map<?, ?>) obj;
-
// signal when restore is complete
final CountDownLatch sessionLatch = new CountDownLatch(1);
@@ -767,6 +741,56 @@
return sessionLatches;
}
+ private PolicySession detmPolicySession(String sessionName) {
+ // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
+ String[] nameSegments = sessionName.split(":");
+
+ // locate the 'PolicyContainer' and 'PolicySession'
+ if (nameSegments.length == 3) {
+ // step through all 'PolicyContainer' instances looking
+ // for a matching 'artifactId' & 'groupId'
+ for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
+ if (nameSegments[1].equals(pc.getArtifactId())
+ && nameSegments[0].equals(pc.getGroupId())) {
+ // 'PolicyContainer' matches -- try to fetch the session
+ return pc.getPolicySession(nameSegments[2]);
+ }
+ }
+ }
+ return null;
+ }
+
+ private Map<?, ?> deserializeMap(String sessionName, ReceiverSessionBucketData rsbd,
+ PolicySession policySession) {
+ Object obj;
+
+ try {
+ // deserialization needs to use the correct 'ClassLoader'
+ obj = Util.deserialize(Base64.getDecoder().decode(rsbd.encodedSerializedData),
+ policySession.getPolicyContainer().getClassLoader());
+ } catch (IOException | ClassNotFoundException | IllegalArgumentException e) {
+ logger.error(RESTORE_BUCKET_ERROR
+ + "Failed to read data for session '{}'",
+ sessionName, e);
+
+ // can't decode -- skip this session
+ return null;
+ }
+
+ if (!(obj instanceof Map)) {
+ logger.error(RESTORE_BUCKET_ERROR
+ + "Session '{}' data has class {}, expected 'Map'",
+ sessionName, obj.getClass().getName());
+
+ // wrong object type decoded -- skip this session
+ return null;
+ }
+
+ // if we reach this point, we have decoded the persistent data
+
+ return (Map<?, ?>) obj;
+ }
+
private void restoreBucketLocks(Bucket bucket) {
if (lockData != null) {
Object obj = null;
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
index 2240460..9fa5460 100644
--- a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
@@ -133,6 +133,7 @@
// but a new copy of class 'AdapterImpl'
Adapter adapter = (Adapter) classLoader.loadClass(
"org.onap.policy.drools.serverpool.AdapterImpl")
+ .getDeclaredConstructor()
.newInstance();
// initialize the adapter