Merge "Re-implement Kafka tests that periodically fail"
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index dfb1261..be1b943 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -164,7 +164,7 @@
         while (consumerThread.isAlive() && !stopOrderedFlag) {
             try {
                 final ConsumerRecords<String, String> records =
-                        kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime());
+                        kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
                 for (final ConsumerRecord<String, String> record : records) {
                     if (LOGGER.isTraceEnabled()) {
                         LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 9d7cc77..7c24ce1 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -20,6 +20,7 @@
 
 package org.onap.policy.apex.plugins.event.carrier.kafka;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
@@ -263,6 +264,14 @@
     }
 
     /**
+     * Gets the consumer poll duration.
+     * @return The poll duration
+     */
+    public Duration getConsumerPollDuration() {
+        return Duration.ofMillis(consumerPollTime);
+    }
+
+    /**
      * Gets the consumer topic list.
      *
      * @return the consumer topic list
diff --git a/pom.xml b/pom.xml
index 18fce71..cbe0044 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,7 @@
         <file.encoding>UTF-8</file.encoding>
         <version.derby>10.13.1.1</version.derby>
         <version.commons-cli>1.4</version.commons-cli>
-        <version.kafka>1.1.1</version.kafka>
+        <version.kafka>2.0.0</version.kafka>
         <version.jersey>2.26</version.jersey>
         <version.eclipselink>2.6.5</version.eclipselink>
         <version.hibernate>5.3.6.Final</version.hibernate>
diff --git a/testsuites/integration/integration-uservice-test/pom.xml b/testsuites/integration/integration-uservice-test/pom.xml
index 52a54c2..6466895 100644
--- a/testsuites/integration/integration-uservice-test/pom.xml
+++ b/testsuites/integration/integration-uservice-test/pom.xml
@@ -142,41 +142,15 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_2.12</artifactId>
             <version>${version.kafka}</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>${version.kafka}</version>
-            <classifier>test</classifier>
+            <groupId>com.salesforce.kafka.test</groupId>
+            <artifactId>kafka-junit4</artifactId>
+            <version>3.0.1</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${version.kafka}</version>
-            <classifier>test</classifier>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.activemq.tooling</groupId>
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
index 072d678..63da4e6 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
@@ -20,12 +20,13 @@
 
 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
 
-import java.util.Properties;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
 
@@ -36,7 +37,7 @@
  */
 public class KafkaEventProducer implements Runnable {
     private final String topic;
-    private final String kafkaServerAddress;
+    private final SharedKafkaTestResource sharedKafkaTestResource;
     private final int eventCount;
     private final boolean xmlEvents;
     private final long eventInterval;
@@ -50,15 +51,15 @@
      * Instantiates a new kafka event producer.
      *
      * @param topic the topic
-     * @param kafkaServerAddress the kafka server address
+     * @param sharedKafkaTestResource the kafka server address
      * @param eventCount the event count
      * @param xmlEvents the xml events
      * @param eventInterval the event interval
      */
-    public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
-                    final boolean xmlEvents, final long eventInterval) {
+    public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
+                    final int eventCount, final boolean xmlEvents, final long eventInterval) {
         this.topic = topic;
-        this.kafkaServerAddress = kafkaServerAddress;
+        this.sharedKafkaTestResource = sharedKafkaTestResource;
         this.eventCount = eventCount;
         this.xmlEvents = xmlEvents;
         this.eventInterval = eventInterval;
@@ -67,22 +68,15 @@
         producerThread.start();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see java.lang.Runnable#run()
      */
     @Override
     public void run() {
-        final Properties kafkaProducerProperties = new Properties();
-        kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
-        kafkaProducerProperties.put("acks", "all");
-        kafkaProducerProperties.put("retries", 0);
-        kafkaProducerProperties.put("batch.size", 16384);
-        kafkaProducerProperties.put("linger.ms", 1);
-        kafkaProducerProperties.put("buffer.memory", 33554432);
-        kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-        final Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProducerProperties);
+        final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
+                        .getKafkaProducer(StringSerializer.class, StringSerializer.class);
 
         while (producerThread.isAlive() && !stopFlag) {
             ThreadUtilities.sleep(50);
@@ -109,8 +103,8 @@
      * @param producer the producer
      */
     private void sendEventsToTopic(final Producer<String, String> producer) {
-        System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
-                        + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
+        System.out.println(KafkaEventProducer.class.getCanonicalName()
+                        + ": sending events to Kafka server , event count " + eventCount + ", xmlEvents " + xmlEvents);
 
         for (int i = 0; i < eventCount; i++) {
             System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
@@ -126,7 +120,7 @@
             producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
             producer.flush();
             eventsSentCount++;
-            System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
+            System.out.println("****** Sent event No. " + eventsSentCount + " ******");
         }
         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
     }
@@ -154,48 +148,4 @@
 
         System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
     }
-
-    /**
-     * The main method.
-     *
-     * @param args the arguments
-     */
-    public static void main(final String[] args) {
-        if (args.length != 5) {
-            System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
-            return;
-        }
-
-        int eventCount = 0;
-        try {
-            eventCount = Integer.parseInt(args[2]);
-        } catch (final Exception e) {
-            System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
-            e.printStackTrace();
-            return;
-        }
-
-        long eventInterval = 0;
-        try {
-            eventInterval = Long.parseLong(args[4]);
-        } catch (final Exception e) {
-            System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
-            e.printStackTrace();
-            return;
-        }
-
-        boolean xmlEvents = false;
-        if (args[3].equalsIgnoreCase("XML")) {
-            xmlEvents = true;
-        } else if (!args[3].equalsIgnoreCase("JSON")) {
-            System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
-            return;
-        }
-
-        final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents,
-                        eventInterval);
-
-        producer.sendEvents();
-        producer.shutdown();
-    }
 }
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
index 4b6a62e..1c33289 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
@@ -20,12 +20,16 @@
 
 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
 
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 
@@ -35,8 +39,9 @@
  * @author Liam Fallon (liam.fallon@ericsson.com)
  */
 public class KafkaEventSubscriber implements Runnable {
+    private static final Duration POLL_DURATION = Duration.ofMillis(100);
+
     private final String topic;
-    private final String kafkaServerAddress;
     private long eventsReceivedCount = 0;
 
     KafkaConsumer<String, String> consumer;
@@ -47,46 +52,44 @@
      * Instantiates a new kafka event subscriber.
      *
      * @param topic the topic
-     * @param kafkaServerAddress the kafka server address
+     * @param sharedKafkaTestResource the kafka server address
      * @throws MessagingException the messaging exception
      */
-    public KafkaEventSubscriber(final String topic, final String kafkaServerAddress) throws MessagingException {
+    public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource)
+                    throws MessagingException {
         this.topic = topic;
-        this.kafkaServerAddress = kafkaServerAddress;
 
-        final Properties props = new Properties();
-        props.put("bootstrap.servers", kafkaServerAddress);
-        props.put("group.id", "test");
-        props.put("enable.auto.commit", "true");
-        props.put("auto.commit.interval.ms", "1000");
-        props.put("session.timeout.ms", "30000");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
-        consumer = new KafkaConsumer<String, String>(props);
+        final Properties consumerProperties = new Properties();
+        consumerProperties.put("group.id", "test");
+
+
+        consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class,
+                        StringDeserializer.class, consumerProperties);
         consumer.subscribe(Arrays.asList(topic));
 
         subscriberThread = new Thread(this);
         subscriberThread.start();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see java.lang.Runnable#run()
      */
     @Override
     public void run() {
-        System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": receiving events from Kafka server at "
-                + kafkaServerAddress + " on topic " + topic);
+        System.out.println(KafkaEventSubscriber.class.getCanonicalName()
+                        + ": receiving events from Kafka server  on topic " + topic);
 
         while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
             try {
-                final ConsumerRecords<String, String> records = consumer.poll(100);
+                final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION);
                 for (final ConsumerRecord<String, String> record : records) {
-                    System.out.println("******");
+                    eventsReceivedCount++;
+                    System.out.println("****** Received event No. " + eventsReceivedCount + " ******");
                     System.out.println("offset=" + record.offset());
                     System.out.println("key=" + record.key());
-                    System.out.println("name=" + record.value());
-                    eventsReceivedCount++;
                 }
             } catch (final Exception e) {
                 // Thread interrupted
@@ -119,19 +122,4 @@
         consumer.close();
         System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": stopped");
     }
-
-
-    /**
-     * The main method.
-     *
-     * @param args the arguments
-     * @throws MessagingException the messaging exception
-     */
-    public static void main(final String[] args) throws MessagingException {
-        if (args.length != 2) {
-            System.err.println("usage KafkaEventSubscriber topic kafkaServerAddress");
-            return;
-        }
-        new KafkaEventSubscriber(args[0], args[1]);
-    }
 }
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
index e70a597..6ab9105 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
@@ -21,104 +21,36 @@
 package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
+import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Properties;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.utilities.TextFileUtils;
 import org.onap.policy.apex.service.engine.main.ApexMain;
 
-
 /**
- * The Class TestKafka2Kafka.
+ * The Class TestKafka2Kafka tests Kafka event sending and reception.
  */
 public class TestKafka2Kafka {
-    // The method of starting an embedded Kafka server used in this example is based on the method
-    // on stack overflow at
-    // https://github.com/asmaier/mini-kafka
+    private static final long MAX_TEST_LENGTH = 60000;
 
-    private static final long MAX_TEST_LENGTH = 20000;
-
-    private static final int EVENT_COUNT = 10;
+    private static final int EVENT_COUNT = 100;
     private static final int EVENT_INTERVAL = 20;
 
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "39902";
-
-    private static EmbeddedZookeeper zkServer;
-    private static ZkClient zkClient;
-    private static KafkaServer kafkaServer;
-
-    /**
-     * Setup dummy kafka server.
-     *
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    @BeforeClass
-    public static void setupDummyKafkaServer() throws IOException {
-        // setup Zookeeper
-        zkServer = new EmbeddedZookeeper();
-        final String zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
-        final ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        final Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
-        brokerProps.setProperty("offsets.topic.replication.factor", "1");
-        brokerProps.setProperty("transaction.state.log.replication.factor", "1");
-        brokerProps.setProperty("transaction.state.log.min.isr", "1");
-        final KafkaConfig config = new KafkaConfig(brokerProps);
-        final Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-        kafkaServer.startup();
-        
-        // create topics
-        AdminUtils.createTopic(zkUtils, "apex-in-0", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "apex-in-1", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-        AdminUtils.createTopic(zkUtils, "apex-out", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-        
-    }
-
-    /**
-     * Shutdown dummy kafka server.
-     *
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    @AfterClass
-    public static void shutdownDummyKafkaServer() throws IOException {
-        if (kafkaServer != null) {
-            kafkaServer.shutdown();
-        }
-        if (zkClient != null) {
-            zkClient.close();
-        }
-        if (zkServer != null) {
-            zkServer.shutdown();
-        }
-    }
+    @ClassRule
+    public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
+                    // Start a cluster with 1 brokers.
+                    .withBrokers(1)
+                    // Disable topic auto-creation.
+                    .withBrokerProperty("auto.create.topics.enable", "false");
 
     /**
      * Test json kafka events.
@@ -128,7 +60,8 @@
      */
     @Test
     public void testJsonKafkaEvents() throws MessagingException, ApexException {
-        final String[] args = {"src/test/resources/prodcons/Kafka2KafkaJsonEvent.json"};
+        final String[] args =
+            { "src/test/resources/prodcons/Kafka2KafkaJsonEvent.json" };
         testKafkaEvents(args, false, "json");
     }
 
@@ -140,7 +73,8 @@
      */
     @Test
     public void testXmlKafkaEvents() throws MessagingException, ApexException {
-        final String[] args = {"src/test/resources/prodcons/Kafka2KafkaXmlEvent.json"};
+        final String[] args =
+            { "src/test/resources/prodcons/Kafka2KafkaXmlEvent.json" };
         testKafkaEvents(args, true, "xml");
     }
 
@@ -153,23 +87,45 @@
      * @throws MessagingException the messaging exception
      * @throws ApexException the apex exception
      */
-    private void testKafkaEvents(final String[] args, final Boolean xmlEvents, final String topicSuffix)
-            throws MessagingException, ApexException {
-        final KafkaEventSubscriber subscriber =
-                new KafkaEventSubscriber("apex-out-" + topicSuffix, "localhost:" + BROKERPORT);
+    private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
+                    throws MessagingException, ApexException {
+
+        try {
+            File tempConfigFile = File.createTempFile("Kafka_", ".json");
+            tempConfigFile.deleteOnExit();
+            String configAsString = TextFileUtils.getTextFileAsString(args[0]).replaceAll("localhost:39902",
+                            sharedKafkaTestResource.getKafkaConnectString());
+            TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
+            args[0] = tempConfigFile.getCanonicalPath();
+
+        } catch (IOException e) {
+            fail("test should not throw an exception");
+        }
+
+        sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
+        sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
+
+        final KafkaEventSubscriber subscriber = new KafkaEventSubscriber("apex-out-" + topicSuffix,
+                        sharedKafkaTestResource);
 
         final ApexMain apexMain = new ApexMain(args);
         ThreadUtilities.sleep(3000);
 
-        final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, "localhost:" + BROKERPORT,
+        final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
                         EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
 
         producer.sendEvents();
-
+        
         final long testStartTime = System.currentTimeMillis();
 
+        // Wait for the producer to send all tis events
         while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
-                && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
+                        && producer.getEventsSentCount() < EVENT_COUNT) {
+            ThreadUtilities.sleep(EVENT_INTERVAL);
+        }
+
+        while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
+                        && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
             ThreadUtilities.sleep(EVENT_INTERVAL);
         }
 
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
index cd758b1..f861c27 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
@@ -43,7 +43,7 @@
                 "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                 "parameters": {
                     "bootstrapServers": "localhost:39902",
-                    "groupId": "apex-group-id",
+                    "groupId": "apex-group",
                     "enableAutoCommit": true,
                     "autoCommitTime": 1000,
                     "sessionTimeout": 30000,
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
index d4468a5..f18ecc2 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
@@ -44,7 +44,7 @@
                 "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                 "parameters": {
                     "bootstrapServers": "localhost:39902",
-                    "groupId": "apex-group-id",
+                    "groupId": "apex-group",
                     "enableAutoCommit": true,
                     "autoCommitTime": 1000,
                     "sessionTimeout": 30000,