Fix NPE issue in msgrt

Fix NullPointerException related sonar issue

Issue-ID: DMAAP-687
Change-Id: I6a2f32398fdef79d0ea5c1abbbe8b3b803873594
Signed-off-by: Parshad Patel <pars.patel@samsung.com>
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index e4e09c8..f60fd53 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -185,9 +185,11 @@
 
 					log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
 							+ "], on topic [" + topic + "].");
-
-					fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+					
+					if (fCache != null) {
+						fCache.signalOwnership(topic, consumerGroupName, consumerId);
+					}
+					
 					final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
 					long fCreateTimeMs = System.currentTimeMillis();
 					KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
index 4c38d57..ddd834c 100644
--- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -218,7 +218,7 @@
 	/**
 	 * interface used to define write method for outputStream
 	 */
-	public static abstract interface StreamWriter {
+	public abstract static interface StreamWriter {
 		/**
 		 * abstract method used to write the response
 		 * 
@@ -252,27 +252,20 @@
 		
 
 		boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
-
-		OutputStream os = null;
-		try{
+		
 		if (fResponseEntityAllowed) {
-			os = ctx.getResponse().getOutputStream();
-			return os;
+			try(OutputStream os = ctx.getResponse().getOutputStream()){
+				return os;
+			}catch (Exception e){
+				log.error("Exception in getStreamForBinaryResponse",e);
+				throw new IOException();
+			}
 		} else {
-			os = new NullStream();
-			return os;
-		}
-		}catch (Exception e){
-			throw new IOException();
-			
-		}
-		finally{
-			if(null != os){
-				try{
-					os.close();
-				}catch(Exception e) {
-					 
-				}
+			try(OutputStream os = new NullStream()){
+				return os;
+			}catch (Exception e){
+				log.error("Exception in getStreamForBinaryResponse",e);
+				throw new IOException();
 			}
 		}
 	}