Sync Integ to Master

Change-Id: I71e3acc26fa612127756ac04073a522b9cc6cd74
Issue-ID: SDC-977
Signed-off-by: Gitelman, Tal (tg851x) <tg851x@intl.att.com>
diff --git a/utils/DmaapPublisher/pom.xml b/utils/DmaapPublisher/pom.xml
new file mode 100644
index 0000000..6152fdf
--- /dev/null
+++ b/utils/DmaapPublisher/pom.xml
@@ -0,0 +1,131 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>org.openecomp.sdc</groupId>
+	<artifactId>dmaap-publisher</artifactId>
+	<version>1.0.0</version>
+
+
+	<properties>
+		<fasterxml.jackson.version>2.8.6</fasterxml.jackson.version>
+	</properties>
+
+	<dependencies>
+		<!--spock testing-->
+		<dependency>
+			<groupId>org.spockframework</groupId>
+			<artifactId>spock-core</artifactId>
+			<version>1.1-groovy-2.4</version>
+			<scope>test</scope>
+		</dependency>
+		<!--groovy-->
+		<dependency>
+			<groupId>org.codehaus.groovy</groupId>
+			<artifactId>groovy</artifactId>
+			<version>2.4.11</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+			<version>3.7</version>
+		</dependency>
+		<dependency>
+			<groupId>com.att.nsa</groupId>
+			<artifactId>dmaapClient</artifactId>
+			<version>0.2.16</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>1.7.25</version>
+		</dependency>
+		<dependency>
+			<groupId>args4j</groupId>
+			<artifactId>args4j</artifactId>
+			<version>2.33</version>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>22.0</version>
+		</dependency>
+		<!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
+		<dependency>
+			<groupId>org.yaml</groupId>
+			<artifactId>snakeyaml</artifactId>
+			<version>1.18</version>
+		</dependency>
+
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>2.8.47</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<version>3.8.0</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.6.1</version>
+				<configuration>
+					<source>1.8</source>
+					<target>1.8</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<configuration>
+					<createDependencyReducedPom>true</createDependencyReducedPom>
+					<filters>
+						<filter>
+							<artifact>*:*</artifact>
+							<excludes>
+								<exclude>META-INF/*.SF</exclude>
+								<exclude>META-INF/*.DSA</exclude>
+								<exclude>META-INF/*.RSA</exclude>
+							</excludes>
+						</filter>
+					</filters>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<transformers>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.openecomp.sdc.dmaap.DmaapPublisher</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+		</plugins>
+	</build>
+</project>
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
new file mode 100644
index 0000000..38e53c8
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
@@ -0,0 +1,59 @@
+package org.openecomp.sdc.dmaap;
+
+import org.kohsuke.args4j.Option;
+import com.google.common.base.MoreObjects;
+
+public class CliArgs {
+
+    @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true)
+    private String yamlFilename;
+
+    @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true)
+    private String yamlPath;
+
+    @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false)
+    private String concurrentRequests;
+
+    @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false)
+    private String notificationData;
+
+    public String getYamlPath() {
+        return yamlPath;
+    }
+
+    public String getYamlFilename() {
+        return yamlFilename;
+    }
+
+    public void setYamlPath(String yamlPath) {
+        this.yamlPath = yamlPath;
+    }
+
+
+    public String getConcurrentRequests() {
+        return concurrentRequests;
+    }
+
+    public void setConcurrentRequests(String concurrentRequests) {
+        this.concurrentRequests = concurrentRequests;
+    }
+
+    public String getNotificationData() {
+        return notificationData;
+    }
+
+
+    public void setYamlFilename(String yamlFilename) {
+        this.yamlFilename = yamlFilename;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("yamlPath", yamlPath)
+                .add("concurrentRequests", concurrentRequests)
+                .toString();
+    }
+    
+    
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
new file mode 100644
index 0000000..61e48fa
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
@@ -0,0 +1,97 @@
+package org.openecomp.sdc.dmaap;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRPublisher.message;
+
+public class DmaapPublishTool {
+
+    private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);   
+    final private TopicConfig topicConfig;
+
+    public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
+        topicConfig = loadTopicConfig(yamlPath);
+        System.out.println("yaml file loaded.");
+    }
+    public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
+        topicConfig = loadTopicConfig(yamlPath);
+        if (StringUtils.isNotBlank(notifications) )
+            topicConfig.add( notifications );
+        System.out.println("yaml file loaded.");
+    }
+
+    public void addNotifications(Collection<String> notification){
+        topicConfig.addAll( notification );
+    }
+
+    //safe stream doesn't throw null pointer exception
+    public <T> Collection<T> safe(Collection<T> obj){
+        return Optional.ofNullable(obj).orElse(Collections.emptySet());
+    }
+    public <T> List<T> safe(List<T> obj){
+        return Optional.ofNullable(obj).orElse(Collections.emptyList());
+    }
+
+    public void publish(String path) throws IOException, InterruptedException {
+        MRBatchingPublisher pub = createPublisher( topicConfig, path );
+        System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
+        List<String> list = this.topicConfig.getIncomingTopicMessages();
+        for(String msg : safe(list) ){
+                publishOne( pub , msg );
+        }
+        closePublisher(pub);
+    }
+
+    private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
+        MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
+        System.out.println("publisher created.");
+        return publisher;
+    }
+
+    private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
+        File yamlFile = new File(Objects.requireNonNull(yamlPath));
+        InputStream input = new FileInputStream(yamlFile);
+        Yaml yamlHelper = new Yaml();
+        return yamlHelper.loadAs(input, TopicConfig.class);
+    }
+
+    private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
+        System.out.println("sending:    " + msg);
+        pub.send(msg);
+        System.out.println("message sent.");
+    }
+
+    private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
+        System.out.println("closing publisher...");
+        // close the publisher to make sure everything's sent before exiting. The batching
+        // publisher interface allows the app to get the set of unsent messages. It could
+        // write them to disk, for example, to try to send them later.
+        final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
+        if(!stuck.isEmpty())
+        {
+            final String errMsg = stuck.size() + " messages unsent";
+            logger.error(errMsg);
+            System.err.println(errMsg);
+        }
+        else
+        {
+            final String successMsg = "Clean exit; all messages sent.";
+            logger.info(successMsg);
+            System.out.println(successMsg);
+        }
+    }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
new file mode 100644
index 0000000..fd55835
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
@@ -0,0 +1,149 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.OptionHandlerFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+
+import static org.openecomp.sdc.dmaap.Util.*;
+
+public class DmaapPublisher {
+    private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class);    
+    private static RequestManager requestManager ;
+    private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque();
+
+
+    private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>();
+    private DmaapPublisher() {}
+
+    public static void add(String notification){
+        notificationBuffer.add( notification );
+    }
+    public static void addAll(List<String> notifications){
+        notificationBuffer.addAll( notifications );
+    }
+    public static void main(String[] args) {
+        doPublish(args);
+    }
+
+    private static void doPublish( String[] args ) {
+        CliArgs cliArgs = new CliArgs();
+        CmdLineParser parser = new CmdLineParser(cliArgs);
+
+        try {
+            // parse the arguments.
+            parser.parseArgument( args );
+            doPublish( cliArgs );
+        }
+        catch(CmdLineException e) {
+            logger.error("#doPublish - failed to parse arguments.", e);
+            printUsage(parser, e);
+            return;
+        }
+    }
+
+    public static void doPublish( CliArgs cliArgs ){
+        try {
+            // parse the arguments.
+            DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData()  );
+            Collection<String> notifications = new ArrayList<String>( notificationBuffer );
+            tool.addNotifications( notifications );
+            notificationBuffer.removeAll(notifications);
+            Integer concurrentRequestCount = 1;
+            if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) )
+                concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() );
+            requestManager = new RequestManager( concurrentRequestCount );
+
+            IntStream.range(0,concurrentRequestCount).forEach( it -> {
+                                        //region -  report upon finish mechanishem
+                                        long ticket = System.nanoTime();
+                                        registeredTasks.add( ticket );
+                                        Consumer callback = ( uniqueTicket ) -> {
+                                            synchronized ( registeredTasks ){
+                                                registeredTasks.remove( (long)uniqueTicket );
+                                                registeredTasks.notifyAll();
+                                            }};
+
+                                        RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback );
+                                        requestManager.getExecutor().execute( task ) ;
+            });
+        }
+        catch(NumberFormatException e) {
+            logger.error("#doPublish - failed to parse argument CR.", e);
+            return;
+        }
+        catch(Exception e) {
+            logger.error("#doPublish - failed to publish.", e);
+        }
+    }
+
+    public static class RunnableReporter implements Runnable{
+
+            final private long ticket ;
+            final private DmaapPublishTool tool;
+            final private CliArgs cliArgs;
+            final Consumer reporter;
+
+            public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args ,  Consumer reporter){
+                this.ticket = ticket ;
+                this.tool = tool ;
+                this.cliArgs = args ;
+                this.reporter = reporter;
+            }
+            @Override
+            public void run() {
+                try {
+                    tool.publish( cliArgs.getYamlPath() );
+                    reporter.accept(ticket);
+                }catch(IOException e){
+                    logger.error("#doPublish - failed to publish.", e);
+                }catch(InterruptedException e){
+                    logger.error("#doPublish - cannot complete publish, thread interuppted.", e);
+                    Thread.currentThread().interrupt();
+                }
+            }
+    }
+
+
+    public static List<Long> getRegisteredTasks() {
+        return registeredTasks;
+    }
+
+    public static void preparePublish( String path,  String filename , String concurrentRequests ){
+
+            CliArgs cliArgs = new CliArgs();
+            if ( StringUtils.isNotBlank( filename ) )
+                cliArgs.setYamlFilename( filename );
+            if ( StringUtils.isNotBlank( path ) )
+                cliArgs.setYamlPath( path );
+            if ( NumberUtils.isCreatable( concurrentRequests ) )
+                cliArgs.setConcurrentRequests(  concurrentRequests );
+
+            doPublish( cliArgs );
+
+    }
+
+
+    private static void printUsage(CmdLineParser parser, CmdLineException e) {
+        System.err.println( e.getMessage() );
+        System.err.println("java DmaapPublisher [options...] arguments...");
+        // print the list of available options
+        parser.printUsage(System.err);
+        System.err.println();
+        // print option sample. This is useful some time
+        System.err.println("  Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL));
+        
+    }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
new file mode 100644
index 0000000..597baac
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class RequestManager {
+
+    private Executor executor;
+
+    public RequestManager(int poolSize ){
+        int sz = Math.max( poolSize , 1);
+        int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2;
+        executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) );
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
new file mode 100644
index 0000000..a5b43ad
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
@@ -0,0 +1,52 @@
+package org.openecomp.sdc.dmaap;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TopicConfig {
+    
+    private String publisherPropertiesFilePath;
+    private String[] topicMessages; //messages from file
+    private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() );  //incoming messages from network stream|Main
+
+    public String getPublisherPropertiesFilePath() {
+        return publisherPropertiesFilePath;
+    }
+    public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) {
+        this.publisherPropertiesFilePath = publisherPropertiesFilePath;
+    }
+
+    public List<String> getIncomingTopicMessages() {
+        return incomingTopicMessages;
+    }
+    public String[] getTopicMessages() {
+        return topicMessages;
+    }
+    //add incoming message
+    public TopicConfig add( String notifications ){
+        incomingTopicMessages.add( notifications);
+        return this;
+    }
+
+    public TopicConfig addAll( Collection<String> notifications ){
+        incomingTopicMessages.addAll( notifications );
+        return this;
+    }
+
+    public void setTopicMessages(String[] topicMessages) {
+        this.topicMessages = topicMessages;
+    }
+    
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("publisherPropertiesFilePath", publisherPropertiesFilePath)
+                .add("topicMessages", topicMessages)
+                .toString();
+    }   
+    
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
new file mode 100644
index 0000000..491b07a
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.file.InvalidPathException;
+
+public class Util {
+
+    public static String toPath(String path , String filename) throws InvalidPathException{
+        if (StringUtils.isNotBlank(path) ){
+            if (path.trim().endsWith("/") || path.trim().endsWith("/")){
+                return path+(filename!=null ? filename : "");
+            }
+            return path+"/"+(filename!=null ? filename : "");
+
+        }
+        throw new InvalidPathException("wrong path configuration cannot find path -> ",path);
+    }
+}
diff --git a/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties b/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties
new file mode 100644
index 0000000..ff739f1
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties
@@ -0,0 +1,35 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username = m09875@sdc.att.com
+password =Aa123456
+contenttype = application/json
+Authorization = Basic bTEzMzMxQGNjZC5hdHQuY29tOkFhMTIzNDU2
+authKey=
+authDate=
+#Dmaap Server Url port 3904-HTTP 3905-https
+host=olsd004.wnsnet.attws.com:3905
+###topic=com.att.ccd.CCD-CatalogManagement-go539p or  com.att.sdc.SDCforTestDev | com.att.sdc.23911-SDCforTestDev-v001
+#com.att.sdc.23911-scdc001dev001test-v1
+topic=com.att.sdc.23911-SDCforTestDev-v001
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=src/main/resources/preferredRouter.txt
+MessageSentThreadOccurance=50
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml b/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml
new file mode 100644
index 0000000..04ac9fb
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml
@@ -0,0 +1,7 @@
+publisherPropertiesFilePath: "catalogMgmt.properties"
+topicMessages:
+  - "{\"operationalEnvironmentId\": \"1234\",\"operationalEnvironmentName\":\"Op Env Name\",\"operationalEnvironmentType\":\"ECOMP\",\"tenantContext\":\"Test\",\"workloadContext\":\"VNF_E2E-IST\",\"action\":\"CREATE\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"logo\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"availabilitymatrix\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\",\"availabilitymatrix\"],\"republish\":\"No\"}"
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties
new file mode 100644
index 0000000..119c94e
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties
@@ -0,0 +1,32 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username =m13331@ccd.att.com 
+password =Aa123456
+contenttype = application/json
+authKey=
+authDate=
+host=olsd004.wnsnet.attws.com:3904
+###topic=com.att.ccd.CCD-CatalogWorkflowManagement-go539p-v1
+topic=com.att.ccd.CCD-CatalogWorkflowManagement-v1
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=resources/preferredRouter.txt
+MessageSentThreadOccurance=50
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml
new file mode 100644
index 0000000..da2ebd6
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml
@@ -0,0 +1,6 @@
+publisherPropertiesFilePath: "resources/catalogWfMgmt.properties"
+topicMessages:
+  - "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Rollback\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}"
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/preferredRouter.txt b/utils/DmaapPublisher/src/main/resources/preferredRouter.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/preferredRouter.txt
diff --git a/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java b/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java
new file mode 100644
index 0000000..19dbdea
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java
@@ -0,0 +1,22 @@
+package org.openecomp.sdc.dmaap;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.*;
+
+public class DmaapPublisherTest {
+    @Test
+    public void main() throws Exception {
+        File resource = new File("src/test/resources");
+        String absPath = resource.getAbsolutePath();
+
+        String msg = "{\"operationalEnvironmentId\":\"12345\",\"operationalEnvironmentName\":\"Op_Env_Name\",\"operationalEnvironmentType\":\"ECOMP\",\"tenantContext\":\"Test\",\"workloadContext\":\"VNF_E2E-IST\",\"action\":\"CREATE\"}";
+        String cmd = "-cr 5 "+ "-notification=" + msg+ " -path "+absPath+" -yaml catalogMgmtTest.yaml" ;
+        DmaapPublisher.main( cmd.split(" ") );
+        Thread.sleep(10000);
+    }
+}
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties
new file mode 100644
index 0000000..7f92221
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties
@@ -0,0 +1,34 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username = m09875@sdc.att.com
+password =Aa123456
+contenttype = application/json
+Authorization = Basic bTEzMzMxQGNjZC5hdHQuY29tOkFhMTIzNDU2
+authKey=
+authDate=
+#Dmaap Server Url port 3904-HTTP 3905-https
+host=olsd004.wnsnet.attws.com:3905
+###topic=com.att.ccd.CCD-CatalogManagement-go539p or  com.att.sdc.SDCforTestDev | com.att.sdc.23911-SDCforTestDev-v001
+topic=com.att.sdc.23911-SDCforTestDev-v001
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=src/test/resources/preferredRouter.txt
+MessageSentThreadOccurance=50
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml
new file mode 100644
index 0000000..f55641d
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml
@@ -0,0 +1,7 @@
+publisherPropertiesFilePath: "catalogMgmtTest.properties"
+topicMessages:
+
+  - "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"logo\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"availabilitymatrix\"],\"republish\":\"No\"}"
+  #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\",\"availabilitymatrix\"],\"republish\":\"No\"}"
\ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/preferredRouter.txt b/utils/DmaapPublisher/src/test/resources/preferredRouter.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/preferredRouter.txt