Removed the DMAAP Accepted/Rejected Call
Issue-ID: NONRTRIC-107
Change-Id: If5839d5b3c8a874298b7c46d118fdc17b76bc097
Signed-off-by: Lathish <lathishbabu.ganesan@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
index 2ae5e5e..503ddab 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.Properties;
-import javax.annotation.PostConstruct;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
@@ -55,9 +54,9 @@
@Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
@Override
public void run() {
- /*
- * if (!alive) { init(); }
- */
+ if (!alive) {
+ init();
+ }
if (this.alive) {
try {
Iterable<String> dmaapMsgs = fetchAllMessages();
@@ -85,7 +84,8 @@
return response.getActualMessages();
}
- @PostConstruct
+ // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation
+ // @PostConstruct
@Override
public void init() {
Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
@@ -98,6 +98,8 @@
}
try {
logger.debug("Creating DMAAP Client");
+ System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic"));
+ System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic"));
consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
this.alive = true;
} catch (IOException e) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
index bf9f06c..713d483 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
@@ -20,17 +20,19 @@
package org.oransc.policyagent.dmaap;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.util.Optional;
import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapMessage;
+import org.oransc.policyagent.model.DmaapRequestMessage;
+import org.oransc.policyagent.model.DmaapResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -60,104 +62,104 @@
@Async("threadPoolTaskExecutor")
public void handleDmaapMsg(String msg) {
init();
- DmaapMessage dmaapMessage = null;
- ResponseEntity<String> response = null;
+ DmaapRequestMessage dmaapRequestMessage = null;
+ Optional<String> dmaapResponse = null;
// Process the message
/**
* Sample Request Message from DMAAP { "type": "request", "correlationId":
* "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
* "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
- * "payload": "{\"ric\":\"ric1\"}" }
+ * "payload": "{\"ricName\":\"ric1\"}" }
+ *
* --------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+ * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
+ * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
* -------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
*/
try {
- dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
- // Post the accepted message to the DMAAP bus
- logger.debug("DMAAP Message- {}", dmaapMessage);
- logger.debug("Post Accepted Message to Client");
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
- .block(); //
+ dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
// Call the Controller
logger.debug("Invoke the Policy Agent Controller");
- response = invokeController(dmaapMessage);
+ dmaapResponse = invokeController(dmaapRequestMessage);
// Post the Response message to the DMAAP bus
- logger.debug("DMAAP Response Message to Client- {}", response);
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
- .block(); //
+ logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
+ if (dmaapResponse.isPresent()) {
+ restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
+ }
} catch (IOException e) {
logger.error("Exception occured during message processing", e);
}
}
- private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+ private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
String formattedString = "";
String ricName;
String instance;
- logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+ String jsonBody;
+ logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
try {
- formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+ formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
logger.debug("Removed the Escape charater in payload- {}", formattedString);
} catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
}
JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapMessage.getOperation()) {
+ switch (dmaapRequestMessage.getOperation()) {
case "getPolicySchemas":
ricName = (String) jsonObject.get("ricName");
logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return policyController.getPolicySchemas(ricName);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
case "getPolicySchema":
String policyTypeId = (String) jsonObject.get("id");
logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- System.out.println("policyTypeId" + policyTypeId);
- return policyController.getPolicySchema(policyTypeId);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
case "getPolicyTypes":
ricName = (String) jsonObject.get("ricName");
logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return policyController.getPolicyTypes(ricName);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
case "getPolicy":
instance = (String) jsonObject.get("instance");
logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return policyController.getPolicy(instance);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
case "deletePolicy":
instance = (String) jsonObject.get("instance");
logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return null;// policyController.deletePolicy(deleteInstance);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
case "putPolicy":
String type = (String) jsonObject.get("type");
String putPolicyInstance = (String) jsonObject.get("instance");
String putPolicyRic = (String) jsonObject.get("ric");
String service = (String) jsonObject.get("service");
- String jsonBody = (String) jsonObject.get("jsonBody");
- return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+ jsonBody = (String) jsonObject.get("jsonBody");
+ return getDmaapResponseMessage(dmaapRequestMessage,
+ policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
case "getPolicies":
String getPolicyType = (String) jsonObject.get("type");
+ instance = (String) jsonObject.get("instance");
String getPolicyRic = (String) jsonObject.get("ric");
String getPolicyService = (String) jsonObject.get("service");
- return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+ jsonBody = (String) jsonObject.get("jsonBody");
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController
+ .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
default:
break;
}
- return null;
+ return Optional.empty();
}
- private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
- String message) {
- System.out.println("buildResponse ");
- return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
- .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
- .put("message", message).toString();
+ private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
+ ResponseEntity<?> policySchemas) {
+ DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
+ .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
+ .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
+ .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
+ .build();
+ try {
+ return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
+ } catch (JsonProcessingException e) {
+ logger.error("Exception occured during getDmaapResponseMessage", e);
+ }
+ return Optional.empty();
}
// @PostConstruct
@@ -168,6 +170,7 @@
Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
String host = (String) dmaapPublisherConfig.get("ServiceName");
topic = dmaapPublisherConfig.getProperty("topic");
+ System.out.println("\"Read the topic ---------->" + topic);
logger.debug("Read the topic & Service Name - {} , {}", host, topic);
this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java
similarity index 96%
rename from policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
rename to policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java
index e56f4b4..0648940 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java
@@ -27,7 +27,7 @@
@Getter
@Setter
-public class DmaapMessage {
+public class DmaapRequestMessage {
@NotNull
private String type;
@@ -35,7 +35,6 @@
private String correlationId;
@NotNull
private String target;
- @NotNull
private Timestamp timestamp;
private String apiVersion;
@NotNull
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java
similarity index 88%
copy from policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
copy to policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java
index e56f4b4..ebf5e8c 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java
@@ -22,26 +22,25 @@
import java.sql.Timestamp;
import javax.validation.constraints.NotNull;
+import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
-public class DmaapMessage {
+@Builder
+public class DmaapResponseMessage {
@NotNull
private String type;
@NotNull
private String correlationId;
- @NotNull
- private String target;
- @NotNull
private Timestamp timestamp;
- private String apiVersion;
@NotNull
private String originatorId;
private String requestId;
@NotNull
- private String operation;
- private String payload;
+ private String status;
+ @NotNull
+ private String message;
}