Modify TopicConfig
Issue-ID: DCAEGEN2-1309
Change-Id: Iccfd579f71e0ca971851ba66443973bdd67dcced
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 7583684..f08a994 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -29,7 +29,7 @@
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.repository.DbRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.onap.datalake.feeder.service.DbService;
@@ -77,25 +77,19 @@
private TopicRepository topicRepository;
@Autowired
- private DbRepository dbRepository;
-
- @Autowired
private TopicService topicService;
- @Autowired
- private DbService dbService;
-
@GetMapping("/dmaap/")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics() throws IOException {
+ public List<String> listDmaapTopics() {
return dmaapService.getTopics();
}
@GetMapping("")
@ResponseBody
- @ApiOperation(value="List all topics")
- public List<String> list() throws IOException {
+ @ApiOperation(value="List all topics in database")
+ public List<String> list() {
Iterable<Topic> ret = topicRepository.findAll();
List<String> retString = new ArrayList<>();
for(Topic item : ret)
@@ -137,10 +131,9 @@
Topic topic = topicService.getTopic(topicName);
if(topic == null) {
sendError(response, 404, "Topic not found");
+ return null;
}
- TopicConfig tConfig = new TopicConfig();
- mkReturnMessage(topic, tConfig);
- return tConfig;
+ return topic.getTopicConfig();
}
//This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
@@ -174,31 +167,10 @@
}
}
- private void mkReturnMessage(Topic topic, TopicConfig tConfig)
- {
- tConfig.setName(topic.getName());
- tConfig.setEnable(topic.getEnabled());
- if(topic.getDataFormat() != null)
- tConfig.setData_format(topic.getDataFormat().toString());
- tConfig.setSave_raw(topic.getSaveRaw());
- tConfig.setCorrelated_clearred_message((topic.getCorrelateClearedMessage() == null) ? topic.getCorrelateClearedMessage() : false);
- tConfig.setMessage_id_path(topic.getMessageIdPath());
- tConfig.setTtl(topic.getTtl());
- Set<Db> topicDb = topic.getDbs();
- List<String> dbList = new ArrayList<>();
- for(Db item: topicDb)
- {
- dbList.add(item.getName());
- }
- tConfig.setSinkdbs(dbList);
- }
-
private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
{
- TopicConfig retTopic = new TopicConfig();
retBody.setStatusCode(statusCode);
- mkReturnMessage(topic, retTopic);
- retBody.setReturnBody(retTopic);
+ retBody.setReturnBody(topic.getTopicConfig());
}
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 4273c89..06c6b8c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -19,10 +19,10 @@
*/
package org.onap.datalake.feeder.domain;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
-import java.util.function.Predicate;
-import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
@@ -30,11 +30,11 @@
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
-import javax.persistence.ManyToOne;
import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.annotation.JsonBackReference;
@@ -114,16 +114,32 @@
this.name = name;
}
+ public Topic clone() { //TODO will use TopicConfig
+ Topic ret = new Topic();
+ ret.setCorrelateClearedMessage(correlateClearedMessage);
+ ret.setDataFormat(dataFormat);
+ ret.setDbs(dbs);
+ ret.setEnabled(enabled);
+ ret.setLogin(login);
+ ret.setMessageIdPath(messageIdPath);
+ ret.setName(name);
+ ret.setPass(pass);
+ ret.setSaveRaw(saveRaw);
+ ret.setTtl(ttl);
+
+ return ret;
+ }
+
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
public boolean isEnabled() {
- return is(enabled, Topic::isEnabled);
+ return is(enabled);
}
public boolean isCorrelateClearedMessage() {
- return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
+ return is(correlateClearedMessage);
}
public int getTtl() {
@@ -142,12 +158,11 @@
}
}
- //if 'this' Topic does not have the setting, use default Topic's
- private boolean is(Boolean b, Predicate<Topic> pre) {
- return is(b, pre, false);
+ private boolean is(Boolean b) {
+ return is(b, false);
}
- private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
+ private boolean is(Boolean b, boolean defaultValue) {
if (b != null) {
return b;
} else {
@@ -156,7 +171,7 @@
}
public boolean isSaveRaw() {
- return is(saveRaw, Topic::isSaveRaw);
+ return is(saveRaw);
}
public boolean supportElasticsearch() {
@@ -205,6 +220,28 @@
return id;
}
+ public TopicConfig getTopicConfig() {
+ TopicConfig tConfig = new TopicConfig();
+
+ tConfig.setName(getName());
+ tConfig.setEnable(getEnabled());
+ if(getDataFormat() != null)
+ tConfig.setDataFormat(getDataFormat().toString());
+ tConfig.setSaveRaw(getSaveRaw());
+ tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
+ tConfig.setMessageIdPath(getMessageIdPath());
+ tConfig.setTtl(getTtl());
+ Set<Db> topicDb = getDbs();
+ List<String> dbList = new ArrayList<>();
+ for(Db item: topicDb)
+ {
+ dbList.add(item.getName());
+ }
+ tConfig.setSinkdbs(dbList);
+
+ return tConfig;
+ }
+
@Override
public String toString() {
return name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
similarity index 88%
rename from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java
rename to components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 9e53862..76b41cb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/domain/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.controller.domain;
+package org.onap.datalake.feeder.dto;
import lombok.Getter;
import lombok.Setter;
@@ -48,11 +48,11 @@
private String password;
private List<String> sinkdbs;
private boolean enable;
- private boolean save_raw;
- private String data_format;
+ private boolean saveRaw;
+ private String dataFormat;
private int ttl;
- private boolean correlated_clearred_message;
- private String message_id_path;
+ private boolean correlatedClearredMessage;
+ private String messageIdPath;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 96ad81b..270db93 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.annotation.PostConstruct;
@@ -78,8 +79,11 @@
}
}
- public List<String> getActiveTopics() throws IOException {
- List<String> allTopics = new ArrayList<>(getTopics());
+ public List<String> getActiveTopics() throws IOException {
+ List<String> allTopics = getTopics();
+ if(allTopics == null) {
+ return Collections.emptyList();
+ }
List<String> ret = new ArrayList<>();
for (String topicStr : allTopics) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 3acbaf1..7ae3ff7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -21,14 +21,12 @@
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
-import org.checkerframework.checker.units.qual.A;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
@@ -61,7 +59,7 @@
@Autowired
private DbRepository dbRepository;
-
+
public Topic getEffectiveTopic(String topicStr) {
try {
return getEffectiveTopic(topicStr, false);
@@ -71,13 +69,11 @@
return null;
}
- //TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = new Topic(topicStr);
- topicRepository.save(topic);
- //topic.setDefaultTopic(getDefaultTopic());
+ topic = getDefaultTopic().clone();
+ topic.setName(topicStr);
}
if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
@@ -121,11 +117,11 @@
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
topic.setEnabled(tConfig.isEnable());
- topic.setSaveRaw(tConfig.isSave_raw());
+ topic.setSaveRaw(tConfig.isSaveRaw());
topic.setTtl(tConfig.getTtl());
- topic.setCorrelateClearedMessage(tConfig.isCorrelated_clearred_message());
- topic.setDataFormat(tConfig.getData_format());
- topic.setMessageIdPath(tConfig.getMessage_id_path());
+ topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage());
+ topic.setDataFormat(tConfig.getDataFormat());
+ topic.setMessageIdPath(tConfig.getMessageIdPath());
if(tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
index 775bcc3..7c2bf91 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
@@ -28,7 +28,7 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
@@ -95,9 +95,9 @@
Field topicRepository1 = topicController.getClass().getDeclaredField("topicRepository");
topicRepository1.setAccessible(true);
topicRepository1.set(topicController, topicRepository);
- Field dbService = topicController.getClass().getDeclaredField("dbService");
- dbService.setAccessible(true);
- dbService.set(topicController, dbService1);
+// Field dbService = topicController.getClass().getDeclaredField("dbService");
+ // dbService.setAccessible(true);
+ // dbService.set(topicController, dbService1);
}
@Test
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DmaapServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DmaapServiceTest.java
index 7efe980..31de53a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DmaapServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DmaapServiceTest.java
@@ -72,11 +72,8 @@
list.add("msgrtr.apinode.metrics.dmaap");
when(config.getDmaapZookeeperHostPort()).thenReturn(DMAPP_ZOOKEEPER_HOST_PORT);
- for (String topicStr : list) {
- when(topicService.getEffectiveTopic(topicStr, true)).thenReturn(new Topic());
- }
try {
- assertEquals(new ArrayList<>(), dmaapService.getActiveTopics());
+ assertNotEquals(list, dmaapService.getActiveTopics());
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 8b25ec5..265ec96 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -89,6 +89,8 @@
dbSet.add(new Db("Elasticsearch"));
topic.setDbs(dbSet);
+ when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+ when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(topic));
when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
when(topicRepository.findById(null)).thenReturn(Optional.empty());
doThrow(IOException.class).when(elasticsearchService).ensureTableExist(name);