Merge "Use MariaDB to store application configurations"
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml
index f88baf3..5b47a24 100644
--- a/components/datalake-handler/feeder/pom.xml
+++ b/components/datalake-handler/feeder/pom.xml
@@ -10,13 +10,19 @@
 		<version>1.0.0-SNAPSHOT</version>
 	</parent>
 
-	<groupId>org.onap.datalake</groupId>
+	<groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
 	<artifactId>feeder</artifactId>
 	<packaging>jar</packaging>
 	<name>DataLake Feeder</name>
 
 
 	<dependencies>
+	
+		<dependency>
+    		<groupId>org.mariadb.jdbc</groupId>
+    		<artifactId>mariadb-java-client</artifactId>
+		</dependency>
+	
 		<dependency>
 			<groupId>org.json</groupId>
 			<artifactId>json</artifactId>
@@ -42,7 +48,12 @@
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-actuator</artifactId>
 		</dependency>
-
+        
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
+        
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-data-couchbase</artifactId>
diff --git a/components/datalake-handler/feeder/src/assembly/docker-compose.yml b/components/datalake-handler/feeder/src/assembly/docker-compose.yml
deleted file mode 100644
index 7ca466b..0000000
--- a/components/datalake-handler/feeder/src/assembly/docker-compose.yml
+++ /dev/null
@@ -1,10 +0,0 @@
-version: '2'
-services:
-
-  datalake:
-    image: moguobiao/datalake-storage
-    container_name: datalake-storage
-    environment:    
-      - no-needed-dmaapHost=10.0.2.15:3904
-    ports:
-      - "1680:1680"
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
new file mode 100644
index 0000000..2185320
--- /dev/null
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -0,0 +1,54 @@
+create database datalake;

+use datalake;

+

+CREATE TABLE `topic` (

+  `name` varchar(255) NOT NULL,

+  `correlate_cleared_message` bit(1) DEFAULT NULL,

+  `enabled` bit(1) DEFAULT NULL,

+  `login` varchar(255) DEFAULT NULL,

+  `message_id_path` varchar(255) DEFAULT NULL,

+  `pass` varchar(255) DEFAULT NULL,

+  `save_raw` bit(1) DEFAULT NULL,

+  `ttl` int(11) DEFAULT NULL,

+  `data_format` varchar(255) DEFAULT NULL,

+  `default_topic` varchar(255) DEFAULT NULL,

+  PRIMARY KEY (`name`),

+  KEY `FK_default_topic` (`default_topic`),

+  CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)

+) ENGINE=InnoDB DEFAULT CHARSET=utf8;

+

+

+CREATE TABLE `db` (

+  `name` varchar(255) NOT NULL,

+  `host` varchar(255) DEFAULT NULL,

+  `login` varchar(255) DEFAULT NULL,

+  `pass` varchar(255) DEFAULT NULL,

+  `property1` varchar(255) DEFAULT NULL,

+  `property2` varchar(255) DEFAULT NULL,

+  `property3` varchar(255) DEFAULT NULL,

+  PRIMARY KEY (`name`)

+) ENGINE=InnoDB DEFAULT CHARSET=utf8;

+

+

+CREATE TABLE `map_db_topic` (

+  `db_name` varchar(255) NOT NULL,

+  `topic_name` varchar(255) NOT NULL,

+  PRIMARY KEY (`db_name`,`topic_name`),

+  KEY `FK_topic_name` (`topic_name`),

+  CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`),

+  CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)

+) ENGINE=InnoDB DEFAULT CHARSET=utf8;

+

+

+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');

+insert into db (name,host) values ('Elasticsearch','dl_es');

+insert into db (name,host) values ('MongoDB','dl_mongodb');

+insert into db (name,host) values ('Druid','dl_druid');

+

+

+-- in production, default enabled should be off

+insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');

+insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);

+

+

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); 

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 108eb4e..1136e30 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -20,13 +20,9 @@
 
 package org.onap.datalake.feeder.config;
 
-import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.ComponentScan;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -42,34 +38,18 @@
 @Setter
 @SpringBootConfiguration
 @ConfigurationProperties
-//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
-//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
 @EnableAutoConfiguration
-//@Profile("test")
 public class ApplicationConfiguration {
 
-	private String couchbaseHost;
-	private String couchbaseUser;
-	private String couchbasePass;
-	private String couchbaseBucket;
-
-	//    private int mongodbPort;    
-	//  private String mongodbDatabase;    
-
 	private String dmaapZookeeperHostPort;
 	private String dmaapKafkaHostPort;
 	private String dmaapKafkaGroup;
 	private long dmaapKafkaTimeout;
 
-//	private boolean dmaapMonitorAllTopics;
 	private int dmaapCheckNewTopicIntervalInSec;
-	//private String dmaapHostPort;
-	//private Set<String> dmaapExcludeTopics; 
-	//private Set<String> dmaapIncludeTopics; 
 
 	private int kafkaConsumerCount;
 
 	private boolean async;
 
-	private String elasticsearchHost;
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
new file mode 100644
index 0000000..c34befc
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
@@ -0,0 +1,135 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.service.DbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages the big data storage settings. All the settings are saved in database. 
+ * 
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DbController {
+
+	private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	@Autowired
+	private DbRepository dbRepository;
+	
+	@Autowired
+	private DbService dbService;
+
+	//list all dbs 
+	@GetMapping("/")
+	@ResponseBody
+	public Iterable<Db> list() throws IOException {
+		Iterable<Db> ret = dbRepository.findAll();
+		return ret;
+	}
+
+	//Read a db
+	//the topics are missing in the return, since in we use @JsonBackReference on Db's topics 
+	//need to the the following method to retrieve the topic list
+	@GetMapping("/{name}")
+	@ResponseBody
+	public Db getDb(@PathVariable("name") String dbName) throws IOException {
+		Db db = dbService.getDb(dbName);
+		return db;
+	}
+
+	//Read topics in a DB 
+	@GetMapping("/{name}/topics")
+	@ResponseBody
+	public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException {
+		Db db = dbService.getDb(dbName);
+		Set<Topic> topics = db.getTopics();
+		return topics;
+	}
+
+	//Update Db
+	@PutMapping("/")
+	@ResponseBody
+	public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+		if (result.hasErrors()) {
+			sendError(response, 400, "Error parsing DB: "+result.toString());
+			return null; 
+		}
+
+		Db oldDb = getDb(db.getName());
+		if (oldDb == null) {
+			sendError(response, 404, "Db not found: "+db.getName());
+			return null; 
+		} else {
+			dbRepository.save(db);			
+			return db;
+		}
+	}
+
+	//create a new Db  
+	@PostMapping("/")
+	@ResponseBody
+	public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+		if (result.hasErrors()) {
+			sendError(response, 400, "Error parsing DB: "+result.toString());
+			return null;
+		}
+
+		Db oldDb = getDb(db.getName());
+		if (oldDb != null) {
+			sendError(response, 400, "Db already exists: "+db.getName());
+			return null;
+		} else {
+			dbRepository.save(db);			
+			return db;
+		}
+	}
+
+	private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+		log.info(msg);
+		response.sendError(sc, msg);		
+	}
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
similarity index 88%
rename from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
rename to components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 2b17637..2e13e1a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -26,6 +26,7 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
@@ -37,8 +38,8 @@
  */
 
 @RestController
-@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
-public class PullController {
+@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class FeederController {
 
 	private final Logger log = LoggerFactory.getLogger(this.getClass());
 	
@@ -49,7 +50,7 @@
      * @return message that application is started
      * @throws IOException 
      */
-    @RequestMapping("/start")
+    @GetMapping("/start")
     public String start() throws IOException {
     	log.info("DataLake feeder starting to pull data from DMaaP...");
     	pullService.start();
@@ -59,7 +60,7 @@
     /**
      * @return message that application stop process is triggered
      */
-    @RequestMapping("/stop")
+    @GetMapping("/stop")
     public String stop() {    	
     	pullService.shutdown();
     	log.info("DataLake feeder is stopped.");
@@ -68,9 +69,9 @@
     /**
      * @return feeder status
      */
-    @RequestMapping("/status")
+    @GetMapping("/status")
     public String status() {    	
-    	String status = "to be impletemented";
+    	String status = "Feeder is running: "+pullService.isRunning();
     	log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. 
     	return status;
     }    
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
index 25028d5..c4aec14 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -21,36 +21,44 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
+import java.util.Set;
 
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DbService;
 import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.TopicService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
- * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
- * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
- * used for that topic. All the settings are saved in Couchbase. topic
- * "_DL_DEFAULT_" is populated at setup by a DB script.
+ * This controller manages topic settings. 
+ * 
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. 
+ * All the settings are saved in database. 
+ * topic "_DL_DEFAULT_" is populated at setup by a DB script.
  * 
  * @author Guobiao Mo
  *
  */
 
 @RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
 public class TopicController {
 
 	private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -60,7 +68,13 @@
 
 	@Autowired
 	private TopicRepository topicRepository;
+	
+	@Autowired
+	private TopicService topicService;
 
+	@Autowired
+	private DbService dbService;
+	
 	//list all topics in DMaaP
 	@GetMapping("/dmaap/")
 	@ResponseBody
@@ -77,33 +91,45 @@
 	}
 
 	//Read a topic
-	@GetMapping("/{name}")
+	@GetMapping("/{topicname}")
 	@ResponseBody
-	public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
-		//Topic topic = topicRepository.findFirstById(topicName);   	
-		Optional<Topic> topic = topicRepository.findById(topicName);
-		if (topic.isPresent()) {
-			return topic.get();
-		} else {
-			return null;
-		}
+	public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException {
+		Topic topic = topicService.getTopic(topicName);
+		return topic;
+	}
+
+	//Read DBs in a topic 
+	@GetMapping("/{topicname}/dbs")
+	@ResponseBody
+	public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException {
+		Topic topic = topicService.getTopic(topicName);
+		Set<Db> dbs = topic.getDbs();
+		return dbs;
 	}
 
 	//Update Topic
+	//This is not a partial update: old topic is wiped out, and new topic is created base on the input json. 
+	//One exception is that old DBs are kept
 	@PutMapping("/")
 	@ResponseBody
-	public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+	public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
 
 		if (result.hasErrors()) {
-			log.error(result.toString());
-			
-			return null;//TODO return binding error
+			sendError(response, 400, "Error parsing Topic: "+result.toString());
+			return null; 
 		}
 
-		Topic oldTopic = getTopic(topic.getId());
+		Topic oldTopic = getTopic(topic.getName());
 		if (oldTopic == null) {
-			return null;//TODO return not found error
+			sendError(response, 404, "Topic not found "+topic.getName());
+			return null; 
 		} else {
+			if(!topic.isDefault()) {
+				Topic defaultTopic = topicService.getDefaultTopic();
+				topic.setDefaultTopic(defaultTopic);
+			}
+			
+			topic.setDbs(oldTopic.getDbs());
 			topicRepository.save(topic);
 			return topic;
 		}
@@ -112,20 +138,56 @@
 	//create a new Topic  
 	@PostMapping("/")
 	@ResponseBody
-	public Topic createTopic(Topic topic, BindingResult result) throws IOException {
-
+	public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+		
 		if (result.hasErrors()) {
-			log.error(result.toString());
+			sendError(response, 400, "Error parsing Topic: "+result.toString());
 			return null;
 		}
 
-		Topic oldTopic = getTopic(topic.getId());
+		Topic oldTopic = getTopic(topic.getName());
 		if (oldTopic != null) {
-			return null;//TODO return 'already exists' error
+			sendError(response, 400, "Topic already exists "+topic.getName());
+			return null;
 		} else {
+			if(!topic.isDefault()) {
+				Topic defaultTopic = topicService.getDefaultTopic();
+				topic.setDefaultTopic(defaultTopic);
+			}
+			
 			topicRepository.save(topic);
 			return topic;
 		}
 	}
 
+	//delete a db from the topic
+	@DeleteMapping("/{topicname}/db/{dbname}")
+	@ResponseBody
+	public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+		Topic topic = topicService.getTopic(topicName);
+		Set<Db> dbs = topic.getDbs();
+		dbs.remove(new Db(dbName));
+		 
+		topicRepository.save(topic);
+		return topic.getDbs();		 
+	}
+
+	//add a db to the topic
+	@PutMapping("/{topicname}/db/{dbname}")
+	@ResponseBody
+	public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+		Topic topic = topicService.getTopic(topicName);
+		Set<Db> dbs = topic.getDbs();		
+
+		Db db = dbService.getDb(dbName);		
+		dbs.add(db);
+		 
+		topicRepository.save(topic);
+		return topic.getDbs();		 
+	}
+	
+	private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+		log.info(msg);
+		response.sendError(sc, msg);		
+	}
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
new file mode 100644
index 0000000..bbaedad
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
@@ -0,0 +1,83 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
+import lombok.Setter;
+ 
+/**
+ * Domain class representing bid data storage
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db")
+public class Db {
+	@Id
+	private String name;
+
+	private String host;
+	private String login;
+	private String pass;
+
+	private String property1;
+	private String property2;
+	private String property3;	 
+
+	@JsonBackReference
+	@ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+	/*
+    @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
+    @JoinTable(	name 				= "map_db_topic",
+    			joinColumns 		= {  @JoinColumn(name="db_name")  },
+    			inverseJoinColumns 	= {  @JoinColumn(name="topic_name")  }
+    ) */
+    protected Set<Topic> topics;	
+	
+	public Db() {
+	}
+
+	public Db(String name) {
+		this.name = name;
+	}
+ 
+	@Override
+	public boolean equals(Object obj) {		
+		return name.equals(((Db)obj).getName());		
+	}
+
+	@Override
+	public int hashCode() {
+		return name.hashCode();		
+	}
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index ace33dc..e1da4d4 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -19,39 +19,61 @@
 */
 package org.onap.datalake.feeder.domain;
 
+import java.util.Set;
 import java.util.function.Predicate;
 
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
 
 import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
 
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
 import lombok.Setter;
  
 /**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic 
  * 
  * @author Guobiao Mo
  *
  */
-@Document
 @Setter
+@Getter
+@Entity
+@Table(name = "topic")
 public class Topic {
-	@NotNull
 	@Id
-	private String id;//topic name 
+	private String name;//topic name 
 
-	@Transient
+	@ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) 
+	@JoinColumn(name = "default_topic", nullable = true)
 	private Topic defaultTopic;
 
 	//for protected Kafka topics
 	private String login;
 	private String pass;
 
+	//@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+	@JsonBackReference
+	//@JsonManagedReference
+    @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) 
+    @JoinTable(	name 				= "map_db_topic",
+    			joinColumns 		= {  @JoinColumn(name="topic_name")  },
+    			inverseJoinColumns 	= {  @JoinColumn(name="db_name")  }
+    )
+	protected Set<Db> dbs;
+	
 	/**
 	 *  indicate if we should monitor this topic
 	 */
@@ -60,20 +82,14 @@
 	/**
 	 * save raw message text
 	 */
+	@Column(name = "save_raw")
 	private Boolean saveRaw;
 
 	/**
-	 * true: save it to Elasticsearch false: don't save null: use default
-	 */
-	private Boolean supportElasticsearch;
-	private Boolean supportCouchbase;
-	private Boolean supportDruid;
-
-	/**
-	 * need to explicitly tell feeder the data format of the message
+	 * need to explicitly tell feeder the data format of the message.
 	 * support JSON, XML, YAML, TEXT
 	 */
-	private DataFormat dataFormat;
+	private String dataFormat;
 
 	/**
 	 * TTL in day
@@ -81,26 +97,24 @@
 	private Integer ttl; 
 	
 	//if this flag is true, need to correlate alarm cleared message to previous alarm 
+	@Column(name = "correlate_cleared_message")
 	private Boolean correlateClearedMessage;
 	
 	//the value in the JSON with this path will be used as DB id
+	@Column(name = "message_id_path")
 	private String messageIdPath;
 
 	public Topic() {
 	}
 
-	public Topic(String id) {
-		this.id = id;
+	public Topic(String name) {
+		this.name = name;
 	}
 
-	public String getId() {
-		return id;
+	public boolean isDefault() {
+		return "_DL_DEFAULT_".equals(name);
 	}
 	
-	public void setDefaultTopic(Topic defaultTopic) {
-		this.defaultTopic = defaultTopic;
-	}
-
 	public boolean isEnabled() {
 		return is(enabled, Topic::isEnabled);	
 	}
@@ -121,7 +135,7 @@
 	
 	public DataFormat getDataFormat() {
 		if (dataFormat != null) {
-			return dataFormat;
+			return DataFormat.fromString(dataFormat);
 		} else if (defaultTopic != null) {
 			return defaultTopic.getDataFormat();
 		} else {
@@ -148,24 +162,51 @@
 		return is(saveRaw, Topic::isSaveRaw);
 	}
 
-	public boolean isSupportElasticsearch() {
-		return is(supportElasticsearch, Topic::isSupportElasticsearch);
+	public boolean supportElasticsearch() {
+		return containDb("Elasticsearch");//TODO string hard codes
 	}
 
-	public boolean isSupportCouchbase() {
-		return is(supportCouchbase, Topic::isSupportCouchbase);
+	public boolean supportCouchbase() {
+		return containDb("Couchbase");
 	}
 
-	public boolean isSupportDruid() {
-		return is(supportDruid, Topic::isSupportDruid);
+	public boolean supportDruid() {
+		return containDb("Druid");
 	}
 
-	//extract DB id from a JSON attribute, TODO support multiple attributes
+	public boolean supportMongoDB() {
+		return containDb("MongoDB");
+	}
+
+	private boolean containDb(String dbName) {		
+		Db db = new Db(dbName);
+		
+		if(dbs!=null && dbs.contains(db)) {
+			return true;
+		}
+		
+		if (defaultTopic != null) {
+			return defaultTopic.containDb(dbName);
+		} else {
+			return false;
+		}
+	}
+	
+	//extract DB id from JSON attributes, support multiple attributes
 	public String getMessageId(JSONObject json) {
 		String id = null;
 		
 		if(StringUtils.isNotBlank(messageIdPath)) {
-			id = json.query(messageIdPath).toString();
+			String[] paths=messageIdPath.split(",");
+			
+			StringBuilder sb= new StringBuilder();
+			for(int i=0; i<paths.length; i++) {
+				if(i>0) {
+					sb.append('^');					
+				}
+				sb.append(json.query(paths[i]).toString());				
+			}
+			id = sb.toString();
 		}
 		
 		return id;
@@ -173,20 +214,17 @@
 	
 	@Override
 	public String toString() {
-		return id;
+		return name;
 	}
 
-	/**
-	 * @return the messageIdPath
-	 */
-	public String getMessageIdPath() {
-		return messageIdPath;
+	@Override
+	public boolean equals(Object obj) {		
+		return name.equals(((Topic)obj).getName());		
 	}
 
-	/**
-	 * @param messageIdPath the messageIdPath to set
-	 */
-	public void setMessageIdPath(String messageIdPath) {
-		this.messageIdPath = messageIdPath;
+	@Override
+	public int hashCode() {
+		return name.hashCode();		
 	}
+	
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
similarity index 82%
rename from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
rename to components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
index 220a8f7..ae03f46 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java
@@ -19,10 +19,18 @@
 */

 package org.onap.datalake.feeder.repository;

 

+import org.onap.datalake.feeder.domain.Db;

+

+import org.springframework.data.repository.CrudRepository;

+

 /**

+ * 

+ * Db Repository 

+ * 

  * @author Guobiao Mo

  *

- */

-public interface TopicRepositoryCustom  {

-	long updateTopic(String topic, Boolean state);

+ */ 

+

+public interface DbRepository extends CrudRepository<Db, String> {

+

 }

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
index 37d1a66..2d9adef 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -20,48 +20,17 @@
 package org.onap.datalake.feeder.repository;

 

 import org.onap.datalake.feeder.domain.Topic;

-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;

-import org.springframework.data.couchbase.core.query.Query;

-import org.springframework.data.couchbase.core.query.ViewIndexed;

-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;

-import org.springframework.data.domain.Page;

-import org.springframework.data.domain.Pageable;

- 

 

-import java.util.List;

+import org.springframework.data.repository.CrudRepository;

 

 /**

  * 

- * Topic Repository interface, implementation is taken care by Spring framework.

- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl. 

+ * Topic Repository 

  * 

  * @author Guobiao Mo

  *

- */

-@ViewIndexed(designDoc = "topic", viewName = "all")

-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {

-/*

-	Topic findFirstById(String topic);

+ */ 

 

-	Topic findByIdAndState(String topic, boolean state);

+public interface TopicRepository extends CrudRepository<Topic, String> {

 

-    //Supports native JSON query string

-    @Query("{topic:'?0'}")

-    Topic findTopicById(String topic);

-

-    @Query("{topic: { $regex: ?0 } })")

-    List<Topic> findTopicByRegExId(String topic);

-

-

-    //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);

-

-    @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")

-    Topic findByCompanyAndAreaId(String companyId, String areaId);

-

-    @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")

-    List<Topic> findByPhoneNumber(String telephoneNumber);

-

-    @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")

-    Long countBuildings(String companyId);

-    */

 }

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
deleted file mode 100644
index 018d5b9..0000000
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*

-* ============LICENSE_START=======================================================

-* ONAP : DataLake

-* ================================================================================

-* Copyright 2019 China Mobile

-*=================================================================================

-* Licensed under the Apache License, Version 2.0 (the "License");

-* you may not use this file except in compliance with the License.

-* You may obtain a copy of the License at

-*

-*     http://www.apache.org/licenses/LICENSE-2.0

-*

-* Unless required by applicable law or agreed to in writing, software

-* distributed under the License is distributed on an "AS IS" BASIS,

-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

-* See the License for the specific language governing permissions and

-* limitations under the License.

-* ============LICENSE_END=========================================================

-*/

-package org.onap.datalake.feeder.repository;

-

-import org.onap.datalake.feeder.domain.Topic;

-import org.springframework.beans.factory.annotation.Autowired;

-import org.springframework.data.couchbase.core.CouchbaseTemplate;

-/*

-import org.springframework.data.mongodb.MongoDbFactory;

-import org.springframework.data.mongodb.core.MongoTemplate;

-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;

-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;

-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;

-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;

-import org.springframework.data.mongodb.core.query.Criteria;

-import org.springframework.data.mongodb.core.query.Query;

-import org.springframework.data.mongodb.core.query.Update;  

-

-import com.mongodb.WriteResult;

-import com.mongodb.client.result.UpdateResult;

-*/

-import java.util.List;

-

-/**

- * @author Guobiao Mo

- *

- */

-public class TopicRepositoryImpl implements TopicRepositoryCustom {

-

-    @Autowired

-    CouchbaseTemplate template;

-    

-    @Override

-    public long updateTopic(String topic, Boolean state) {

-/*

-        Query query = new Query(Criteria.where("id").is(topic));

-        Update update = new Update();

-        update.set("state", state);

-

-        UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);

-

-        if(result!=null)

-            return result.getModifiedCount();

-        else

-  */          return 0L;

-    	

-    	

-    	

-    }

-}

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index 3543258..f74829e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -27,7 +27,7 @@
 import javax.annotation.PreDestroy;
  
 import org.json.JSONObject;
-import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,18 +57,20 @@
 	private final Logger log = LoggerFactory.getLogger(this.getClass());
 
 	@Autowired
-	private ApplicationConfiguration config;
-
+	private DbService dbService;
+	
 	Bucket bucket;		
 
 	@PostConstruct
 	private void init() {
         // Initialize Couchbase Connection
-        Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
-        cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
-        bucket = cluster.openBucket(config.getCouchbaseBucket());
+		
+		Db couchbase = dbService.getCouchbase();
+        Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
+        cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+        bucket = cluster.openBucket(couchbase.getProperty1());
 
-		log.info("Connect to Couchbase " + config.getCouchbaseHost());
+		log.info("Connect to Couchbase " + couchbase.getHost());
 		
         // Create a N1QL Primary Index (but ignore if it exists)
         bucket.bucketManager().createN1qlPrimaryIndex(true, false);                 
@@ -90,15 +92,21 @@
 			//setup TTL
 			int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
 			
-			String id = getId(topic.getId());
+			String id = getId(topic, json);
 			JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
 			documents.add(doc);
 		}
 		saveDocuments(documents);		
 	}
 
-
-	private String getId(String topicStr) {
+	public String getId(Topic topic, JSONObject json) {
+		//if this topic requires extract id from JSON
+		String id = topic.getMessageId(json);
+		if(id != null) {
+			return id;
+		}
+		
+		String topicStr= topic.getName();		
 		//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
 
 		//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -106,7 +114,7 @@
 		// increment by 1, initialize at 0 if counter doc not found
 		//TODO how slow is this compared with above UUID approach?
 		JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 
-		String id = topicStr +":"+ nextIdNumber.content();
+		id = topicStr +":"+ nextIdNumber.content();
 		
 		return id;
 	}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
new file mode 100644
index 0000000..f0d943d
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for Dbs 
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DbService {
+
+	private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+	@Autowired
+	private DbRepository dbRepository;
+	
+	public Db getDb(String name) {
+		Optional<Db> ret = dbRepository.findById(name);
+		return ret.isPresent() ? ret.get() : null;
+	}	
+
+	public Db getCouchbase() {
+		return getDb("Couchbase");
+	}
+
+	public Db getElasticsearch() {
+		return getDb("Elasticsearch");
+	}
+
+	public Db getMongoDB() {
+		return getDb("MongoDB");
+	}
+
+	public Db getDruid() {
+		return getDb("Druid");
+	}	
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index cbcc5f8..1f637e1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -40,6 +41,7 @@
 import org.elasticsearch.common.xcontent.XContentType;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,14 +62,18 @@
 
 	@Autowired
 	private ApplicationConfiguration config;
+	
+	@Autowired
+	private DbService dbService;
 
 	private RestHighLevelClient client;
 	ActionListener<BulkResponse> listener;
 
 	@PostConstruct
 	private void init() {
-		String elasticsearchHost = config.getElasticsearchHost();
-
+		Db elasticsearch = dbService.getElasticsearch();
+		String elasticsearchHost = elasticsearch.getHost();
+		
 		// Initialize the Connection
 		client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
@@ -93,14 +99,16 @@
 	
 	public void ensureTableExist(String topic) throws IOException {
 		String topicLower = topic.toLowerCase();
-
-		CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
-		try {
+		
+		GetIndexRequest request = new GetIndexRequest();
+		request.indices(topicLower);
+		
+		boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+		if(!exists){
+			CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
 			CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);		
 			log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
-		}catch(ElasticsearchStatusException e) {
-			log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());			
-		}
+		} 
 	}
 	
 	//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
@@ -114,7 +122,10 @@
 					continue;
 				}
 			}
-			request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+			
+			String id = topic.getMessageId(json); //id can be null
+			
+			request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
 		}
 		if(config.isAsync()) {
 			client.bulkAsync(request, RequestOptions.DEFAULT, listener);			
@@ -122,19 +133,23 @@
 			try {
 				client.bulk(request, RequestOptions.DEFAULT);
 			} catch (IOException e) { 
-				log.error( topic.getId() , e);
+				log.error( topic.getName() , e);
 			}
 		}
 	}
 	
 	private boolean correlateClearedMessage(JSONObject json) {
-		boolean found = false;
-		
+		boolean found = true;
+				
 		/*TODO
 		 * 1. check if this is a alarm cleared message
 		 * 2. search previous alarm message
 		 * 3. update previous message, if success, set found=true
 		 */
+		//for Sonar test, remove the following
+		if(json.isNull("kkkkk")) {
+			found = false;
+		}
 		
 		return found; 
 	}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
index 3dcbd8e..4433c8c 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -27,8 +27,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.PostConstruct;
-
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,10 +56,13 @@
 	@Autowired
 	private ApplicationConfiguration config;
 
-	@PostConstruct
-	private void init() {
+	/**
+	 * @return the isRunning
+	 */
+	public boolean isRunning() {
+		return isRunning;
 	}
-
+ 
 	/**
 	 * start pulling.
 	 * 
@@ -109,6 +110,7 @@
 			executorService.awaitTermination(10L, TimeUnit.SECONDS);
 		} catch (InterruptedException e) {
 			logger.error("executor.awaitTermination", e);
+			Thread.currentThread().interrupt();
 		}
 		
 		isRunning = false;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 1cd3a8a..84e4fb7 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -152,11 +152,11 @@
 	}
 
 	private void saveJsons(Topic topic, List<JSONObject> jsons) {
-		if (topic.isSupportCouchbase()) {
+		if (topic.supportCouchbase()) {
 			couchbaseService.saveJsons(topic, jsons);
 		}
 
-		if (topic.isSupportElasticsearch()) {
+		if (topic.supportElasticsearch()) {
 			elasticsearchService.saveJsons(topic, jsons);
 		}
 	}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 9b8fabc..4e10a36 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -34,8 +34,7 @@
 import org.springframework.stereotype.Service;
 
 /**
- * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
- * application.properties for Spring setup
+ * Service for topics 
  * 
  * @author Guobiao Mo
  *
@@ -68,15 +67,14 @@
 		return null;
 	}
 		
+	//TODO caller should not modify the returned topic, maybe return a clone
 	public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
 		Topic topic = getTopic(topicStr);
 		if (topic == null) {
-			topic = new Topic(topicStr);
+			topic = getDefaultTopic();
 		}
-
-		topic.setDefaultTopic(getDefaultTopic());
 		
-		if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { 
+		if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { 
 			elasticsearchService.ensureTableExist(topicStr); 
 		}
 		return topic;
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index a0ab90f..ea94d00 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -2,6 +2,16 @@
 server.port = 1680
 
 
+# Spring connection to MariaDB for ORM
+#spring.jpa.hibernate.ddl-auto=update
+spring.jpa.hibernate.ddl-auto=none
+spring.jpa.show-sql=false
+
+#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&amp;useUnicode=true&amp;characterEncoding=UTF-8
+spring.datasource.username=nook
+spring.datasource.password=nook123
+
 
 #For Beijing lab
 #dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
@@ -30,24 +40,3 @@
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
  
-# Spring connection to Couchbase DB for ORM
-
-#not work when run in osboxes, but works in Eclipse
-spring.couchbase.bootstrap-hosts=dl_couchbase
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-
-#a user with name as bucket.name must be created, with the pass as bucket.password
-# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase
-spring.couchbase.bucket.name=dl
-spring.couchbase.bucket.password=dl1234
-spring.data.couchbase.auto-index=true
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase 
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
index e536c7b..a20e5eb 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
@@ -227,7 +227,7 @@
 		"taskDuration": "PT1H",
 		"completionTimeout": "PT30M",
 		"consumerProperties": {
-			"bootstrap.servers": "dl_dmaap:9092"
+			"bootstrap.servers": "message-router-kafka:9092"
 		},
 		"useEarliestOffset": true
 	}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json
new file mode 100644
index 0000000..cb3c98d
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json
@@ -0,0 +1,52 @@
+{

+	"_id": "5bb3dfae5bea3f1cb49d4f3f",

+	"cambria.partition": "AAI",

+	"event-header": {

+		"severity": "NORMAL",

+		"entity-type": "esr-thirdparty-sdnc",

+		"top-entity-type": "esr-thirdparty-sdnc",

+		"entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1",

+		"event-type": "AAI-EVENT",

+		"domain": "dev",

+		"action": "CREATE",

+		"sequence-number": "0",

+		"id": "69f2935f-c3c1-4a63-b3f1-519c3898328b",

+		"source-name": "ying",

+		"version": "v11",

+		"timestamp": "20180919-06:20:44:216"

+	},

+	"entity": {

+		"thirdparty-sdnc-id": "SDWANController1",

+		"resource-version": "1537338043473",

+		"location": "Core",

+		"product-name": "SD-WAN",

+		"esr-system-info-list": {

+			"esr-system-info": [

+				{

+					"esr-system-info-id": "SDWANController1-ESR-1",

+					"system-type": "example-system-type-val-12078",

+					"service-url": "https://172.19.48.77:18008",

+					"ssl-cacert": "example-ssl-cacert-val-20589",

+					"type": "WAN",

+					"ssl-insecure": true,

+					"system-status": "example-system-status-val-23435",

+					"version": "V3R1",

+					"passive": true,

+					"password": "Onap@12345",

+					"protocol": "RESTCONF",

+					"ip-address": "172.19.48.77",

+					"cloud-domain": "example-cloud-domain-val-76077",

+					"user-name": "northapi1@huawei.com",

+					"system-name": "SDWANController",

+					"port": "18008",

+					"vendor": "IP-WAN",

+					"resource-version": "1537338044166",

+					"remote-path": "example-remotepath-val-5833",

+					"default-tenant": "example-default-tenant-val-71148"

+				}

+			]

+		}

+	},

+	"_dl_type_": "JSON",

+	"_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}"

+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
index 16eb163..19bf6ed 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
@@ -142,7 +142,7 @@
 		"taskDuration": "PT1H",
 		"completionTimeout": "PT30M",
 		"consumerProperties": {
-			"bootstrap.servers": "dl_dmaap:9092"
+			"bootstrap.servers": "message-router-kafka:9092"
 		},
 		"useEarliestOffset": true
 	}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
index 3c871a8..6797b19 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
@@ -200,7 +200,7 @@
 		"taskDuration": "PT1H",
 		"completionTimeout": "PT30M",
 		"consumerProperties": {
-			"bootstrap.servers": "dl_dmaap:9092"
+			"bootstrap.servers": "message-router-kafka:9092"
 		},
 		"useEarliestOffset": true
 	}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
index c3f6037..f910ace 100644
--- a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
+++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
@@ -302,7 +302,7 @@
 		"taskDuration": "PT1H",
 		"completionTimeout": "PT30M",
 		"consumerProperties": {
-			"bootstrap.servers": "dl_dmaap:9092"
+			"bootstrap.servers": "message-router-kafka:9092"
 		},
 		"useEarliestOffset": true
 	}
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json
new file mode 100644
index 0000000..957060f
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json
@@ -0,0 +1,75 @@
+{

+	"_id": {

+		"$oid": "5bb3d3aa5bea3f41300957c6"

+	},

+	"sendEpsShort": {

+		"summary": "0.0 send eps (10 mins)",

+		"raw": 0

+	},

+	"recvEpsInstant": {

+		"summary": "0.03333333333333333 recv eps (1 min)",

+		"raw": 0.03333333333333333

+	},

+	"fanOut": {

+		"summary": "0.7051873815491838 sends per recv",

+		"raw": 0.7051873815491838

+	},

+	"sendEpsLong": {

+		"summary": "0.0 send eps (1 hr)",

+		"raw": 0

+	},

+	"kafkaConsumerTimeouts": {

+		"summary": "164 Kafka Consumers Timedout",

+		"raw": 164

+	},

+	"recvEpsLong": {

+		"summary": "0.03333333333333333 recv eps (1 hr)",

+		"raw": 0.03333333333333333

+	},

+	"sendEpsInstant": {

+		"summary": "0.0 send eps (1 min)",

+		"raw": 0

+	},

+	"recvEpsShort": {

+		"summary": "0.03333333333333333 recv eps (10 mins)",

+		"raw": 0.03333333333333333

+	},

+	"kafkaConsumerClaims": {

+		"summary": "1 Kafka Consumers Claimed",

+		"raw": 1

+	},

+	"version": {

+		"summary": "Version 1.1.3",

+		"raw": 0

+	},

+	"upTime": {

+		"summary": "604800 seconds since start",

+		"raw": 604800

+	},

+	"sendTotalEvents": {

+		"summary": "46139 Total events sent since start",

+		"raw": 46139

+	},

+	"hostname": "3d5704fccbc5",

+	"kafkaConsumerCacheMiss": {

+		"summary": "179 Kafka Consumer Cache Misses",

+		"raw": 179

+	},

+	"metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC",

+	"kafkaConsumerCacheHit": {

+		"summary": "317143 Kafka Consumer Cache Hits",

+		"raw": 317143

+	},

+	"now": 1537639709380,

+	"transactionEnabled": false,

+	"startTime": {

+		"summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC",

+		"raw": 1537034908

+	},

+	"recvTotalEvents": {

+		"summary": "65428 Total events received since start",

+		"raw": 65428

+	},

+	"_dl_type_": "JSON",

+	"_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}"

+}

diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json
new file mode 100644
index 0000000..8de08f7
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json
@@ -0,0 +1,25 @@
+{

+	"_id": "5bb3d3ad5bea3f41300959ba",

+	"closedLoopEventClient": "DCAE.HolmesInstance",

+	"policyVersion": "1.0.0.5",

+	"policyName": "CCVPN",

+	"policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8",

+	"target_type": "VM",

+	"AAI": {

+		"serviceType": "TestService",

+		"service-instance_service-instance-id": "200",

+		"globalSubscriberId": "Customer1",

+		"vserver_vserver-name": "TBD",

+		"network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200"

+	},

+	"closedLoopAlarmStart": "1532769303924000",

+	"closedLoopEventStatus": "ONSET",

+	"version": "1.0.2",

+	"closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b",

+	"target": "vserver.vserver-name",

+	"closedLoopAlarmEnd": "1532769303924000",

+	"requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87",

+	"from": "DCAE",

+	"_dl_type_": "JSON",

+	"_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}"

+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json
new file mode 100644
index 0000000..bb506d5
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json
@@ -0,0 +1,57 @@
+{

+	"_id": "5bb3d3b45bea3f41300960f8",

+	"event": {

+		"commonEventHeader": {

+			"sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",

+			"startEpochMicrosec": "1537438335829000",

+			"eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857",

+			"domain": "fault",

+			"lastEpochMicrosec": 1537438335829000,

+			"eventName": "Fault_Route_Status",

+			"sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",

+			"priority": "High",

+			"version": 3,

+			"reportingEntityName": "Domain_Contorller"

+		},

+		"faultFields": {

+			"eventSeverity": "CRITICAL",

+			"alarmCondition": "Route_Status",

+			"faultFieldsVersion": 2,

+			"specificProblem": "Fault_SOTN_Service_Status",

+			"alarmAdditionalInformation": [

+				{

+					"name": "networkId",

+					"value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100"

+				},

+				{

+					"name": "node",

+					"value": "11.11.11.11"

+				},

+				{

+					"name": "tp-id",

+					"value": "27"

+				},

+				{

+					"name": "oper-status",

+					"value": "down"

+				},

+				{

+					"name": "network-ref",

+					"value": "providerId/5555/clientId/6666/topologyId/33"

+				},

+				{

+					"name": "node-ref",

+					"value": "0.51.0.103"

+				},

+				{

+					"name": "tp-ref",

+					"value": "4"

+				}

+			],

+			"eventSourceType": "other",

+			"vfStatus": "Active"

+		}

+	},

+	"_dl_type_": "JSON",

+	"_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}"

+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index 934451f..02db5a2 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -56,10 +56,6 @@
 
 	@Test
 	public void readConfig() {
-		assertNotNull(config.getCouchbaseHost());
-		assertNotNull(config.getCouchbaseUser());
-		assertNotNull(config.getCouchbasePass());
-		assertNotNull(config.getCouchbaseBucket());
 
 		assertNotNull(config.getDmaapZookeeperHostPort());
 		assertNotNull(config.getDmaapKafkaHostPort());
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
similarity index 66%
copy from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
copy to components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
index 220a8f7..ea1d689 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
@@ -1,28 +1,44 @@
-/*

-* ============LICENSE_START=======================================================

-* ONAP : DataLake

-* ================================================================================

-* Copyright 2019 China Mobile

-*=================================================================================

-* Licensed under the Apache License, Version 2.0 (the "License");

-* you may not use this file except in compliance with the License.

-* You may obtain a copy of the License at

-*

-*     http://www.apache.org/licenses/LICENSE-2.0

-*

-* Unless required by applicable law or agreed to in writing, software

-* distributed under the License is distributed on an "AS IS" BASIS,

-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

-* See the License for the specific language governing permissions and

-* limitations under the License.

-* ============LICENSE_END=========================================================

-*/

-package org.onap.datalake.feeder.repository;

-

-/**

- * @author Guobiao Mo

- *

- */

-public interface TopicRepositoryCustom  {

-	long updateTopic(String topic, Boolean state);

-}

+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.junit.Test;
+ 
+/**
+ * Test Db
+ * 
+ * @author Guobiao Mo
+ *
+ */
+ 
+public class DbTest {	
+
+    @Test
+	public void testIs() {
+		Db couchbase=new Db("Couchbase");
+		Db mongoDB=new Db("MongoDB");
+		Db mongoDB2=new Db("MongoDB");
+		assertNotEquals(couchbase.hashCode(), mongoDB.hashCode());
+		assertNotEquals(couchbase, mongoDB);
+		assertEquals(mongoDB, mongoDB2);		
+	}
+}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
index 23ec3b1..1e40252 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
@@ -23,8 +23,11 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashSet;
+
 import org.json.JSONObject;
-import org.junit.Test; 
+import org.junit.Test;
+import org.onap.datalake.feeder.enumeration.DataFormat; 
  
 /**
  * Test Topic
@@ -50,17 +53,49 @@
     }
 
     @Test
+    public void getMessageIdFromMultipleAttributes() {
+    	String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+    	
+    	JSONObject json = new JSONObject(text);
+    	
+    	Topic topic = new Topic("test getMessageId");
+    	topic.setMessageIdPath("/data/data2/value,/data/data3");
+    	
+    	String value = topic.getMessageId(json);
+
+        assertEquals(value, "hello^world");    	
+    }
+
+    @Test
 	public void testIs() {
-		Topic defaultTopic=new Topic("default");
+		Topic defaultTopic=new Topic("_DL_DEFAULT_");
 		Topic testTopic = new Topic("test");
 		testTopic.setDefaultTopic(defaultTopic);
+
+		assertTrue(defaultTopic.isDefault());
+		assertFalse(testTopic.isDefault());		
+
+		assertTrue(testTopic.equals(new Topic("test")));
+		assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
 		
-		defaultTopic.setSupportElasticsearch(true);		
-		boolean b = testTopic.isSupportElasticsearch();
-		assertTrue(b);
+		defaultTopic.setDbs(new HashSet<>());
+		defaultTopic.getDbs().add(new Db("Elasticsearch"));		
+		assertTrue(testTopic.supportElasticsearch());
+		assertFalse(testTopic.supportCouchbase());
+		assertFalse(testTopic.supportDruid());
+		assertFalse(testTopic.supportMongoDB());		
 		
-		defaultTopic.setSupportElasticsearch(false);		
-		b = testTopic.isSupportElasticsearch();
-		assertFalse(b);
+		defaultTopic.getDbs().remove(new Db("Elasticsearch"));	
+		assertFalse(testTopic.supportElasticsearch());
+		
+		defaultTopic.setCorrelateClearedMessage(true);
+		defaultTopic.setDataFormat("XML");
+		defaultTopic.setEnabled(true);
+		defaultTopic.setSaveRaw(true);		
+		assertTrue(testTopic.isCorrelateClearedMessage());
+		assertTrue(testTopic.isEnabled());
+		assertTrue(testTopic.isSaveRaw()); 
+		
+		assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
 	}
 }
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index ede5999..d6d98e6 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -2,14 +2,6 @@
 server.port = 1680
 
 
-
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
- 
-
 #DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
@@ -30,13 +22,4 @@
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
  
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase 
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
+ 
diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml
index ede2a27..a526cb5 100644
--- a/components/datalake-handler/pom.xml
+++ b/components/datalake-handler/pom.xml
@@ -31,7 +31,7 @@
 		<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
 		<jackson.version>2.9.6</jackson.version>
 		<kafka.version>2.0.0</kafka.version>
-		<elasticsearchjava.version>6.6.0</elasticsearchjava.version>
+		<elasticsearchjava.version>6.7.0</elasticsearchjava.version>
 
 	</properties>
 
@@ -39,6 +39,12 @@
 		<dependencies>
 
 			<dependency>
+    			<groupId>org.mariadb.jdbc</groupId>
+    			<artifactId>mariadb-java-client</artifactId>
+    			<version>2.4.1</version>
+			</dependency>
+
+			<dependency>
 				<groupId>commons-io</groupId>
 				<artifactId>commons-io</artifactId>
 				<version>2.6</version>
@@ -142,6 +148,13 @@
 				<version>${springboot.version}</version>
 			</dependency>
 			<!-- end::actuator[] -->
+			
+        	<dependency>
+            	<groupId>org.springframework.boot</groupId>
+         	   <artifactId>spring-boot-starter-data-jpa</artifactId>
+				<version>${springboot.version}</version>
+       		</dependency>
+        
 			<dependency>
 				<groupId>org.springframework.boot</groupId>
 				<artifactId>spring-boot-starter-data-couchbase</artifactId>