Merge "Add function to framework of frond-end"
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 9106185..2a72a76 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -67,4 +67,7 @@
 	private int hdfsBufferSize;	
 	private long hdfsFlushInterval;
 	private int hdfsBatchSize;
+
+	//Version
+	private String DatalakeVersion;
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 3d296d5..4fc9b7b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -21,14 +21,13 @@
 
 import java.io.IOException;
 
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.service.PullService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 
 import io.swagger.annotations.ApiOperation;
 
@@ -40,35 +39,45 @@
  */
 
 @RestController
-@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+@RequestMapping(value = "/feeder", produces = { MediaType.APPLICATION_JSON_VALUE })
 public class FeederController {
 
 	private final Logger log = LoggerFactory.getLogger(this.getClass());
 	
     @Autowired
     private PullService pullService;
+
+    @Autowired
+    ApplicationConfiguration config;
     
     /**
      * @return message that application is started
      * @throws IOException 
      */
-    @GetMapping("/start")
+    @PostMapping("/start")
+    @ResponseBody
 	@ApiOperation(value="Start pulling data.")
     public String start() throws IOException {
     	log.info("DataLake feeder starting to pull data from DMaaP...");
-    	pullService.start();
-        return "DataLake feeder is running.";
+    	if(pullService.isRunning() == false) {
+            pullService.start();
+        }
+        return "{\"running\": true}";
     }
 
     /**
      * @return message that application stop process is triggered
      */
-    @GetMapping("/stop")
+    @PostMapping("/stop")
+    @ResponseBody
 	@ApiOperation(value="Stop pulling data.")
-    public String stop() {    	
-    	pullService.shutdown();
+    public String stop() {
+        if(pullService.isRunning() == true)
+        {
+            pullService.shutdown();
+        }
     	log.info("DataLake feeder is stopped.");
-    	return "DataLake feeder is stopped.";
+    	return "{\"running\": false}";
     }
     /**
      * @return feeder status
@@ -77,7 +86,8 @@
 	@ApiOperation(value="Retrieve feeder status.")
     public String status() {    	
     	String status = "Feeder is running: "+pullService.isRunning();
-    	log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. 
-    	return status;
-    }    
+    	log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+
+        return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}";
+    }
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index 1154b3a..3a07e2f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -155,7 +155,6 @@
 	public void shutdown() {
 		active.set(false);
 		consumer.wakeup();
-		consumer.unsubscribe();
 	}
 
 	private class DummyRebalanceListener implements ConsumerRebalanceListener {
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index b9d6b9e..10ad9f8 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -54,4 +54,7 @@
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
+
+#####################Verison
+DatalakeVersion=0.0.1
  
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
index 713d8b1..7d0b4ee 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/FeederControllerTest.java
@@ -94,7 +94,7 @@
         ConsumerRecords<String, String> records = ConsumerRecords.empty();
         when(kafkaConsumer.poll(2)).thenReturn(records);
         String start = feederController.start();
-        assertEquals("DataLake feeder is running.", start);
+        assertEquals("{\"running\": true}", start);
     }
 
     @Test
@@ -102,14 +102,17 @@
         FeederController feederController = new FeederController();
         setAccessPrivateFields(feederController);
         String stop = feederController.stop();
-        assertEquals("DataLake feeder is stopped.", stop);
+        assertEquals("{\"running\": false}", stop);
     }
 
     @Test
     public void testStatus() throws NoSuchFieldException, IllegalAccessException {
+        ApplicationConfiguration conf = new ApplicationConfiguration();
+        conf.setDatalakeVersion("0.0.1");
         FeederController feederController = new FeederController();
+        feederController.config = conf;
         setAccessPrivateFields(feederController);
         String status = feederController.status();
-        assertEquals("Feeder is running: false", status);
+        assertEquals("{\"version\": \"0.0.1\", \"running\": false}", status);
     }
 }