Merge "Revert the change to avoid break PNF PnP"
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/PnfNotificationEvent.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/PnfNotificationEvent.java
deleted file mode 100644
index 267ddbf..0000000
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/PnfNotificationEvent.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.so.bpmn.infrastructure.pnf;
-
-import org.springframework.context.ApplicationEvent;
-
-public class PnfNotificationEvent extends ApplicationEvent {
-
-    private String pnfCorrelationId;
-
-    /**
-     * Create a new ApplicationEvent.
-     *
-     * @param source the object on which the event initially occurred (never {@code null})
-     */
-    public PnfNotificationEvent(Object source, String pnfCorrelationId) {
-        super(source);
-        this.pnfCorrelationId = pnfCorrelationId;
-    }
-
-    public String getPnfCorrelationId() {
-        return pnfCorrelationId;
-    }
-}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
index e2dc375..94cedda 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
@@ -23,40 +23,30 @@
 import org.camunda.bpm.engine.RuntimeService;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
+import org.camunda.bpm.engine.runtime.Execution;
 import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 
 @Component
-public class InformDmaapClient implements JavaDelegate, ApplicationListener<PnfNotificationEvent> {
+public class InformDmaapClient implements JavaDelegate {
 
     private Logger logger = LoggerFactory.getLogger(getClass());
     private DmaapClient dmaapClient;
-    private DelegateExecution execution;
 
     @Override
     public void execute(DelegateExecution execution) {
         String pnfCorrelationId = (String) execution.getVariable(ExecutionVariableNames.PNF_CORRELATION_ID);
         RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
+        String processBusinessKey = execution.getProcessBusinessKey();
         dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
-                .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
-        this.execution = execution;
+                .processInstanceBusinessKey(processBusinessKey).correlateWithResult());
     }
 
     @Autowired
     public void setDmaapClient(DmaapClient dmaapClient) {
         this.dmaapClient = dmaapClient;
     }
-
-    @Override
-    public void onApplicationEvent(PnfNotificationEvent event) {
-        logger.info("Received application event for pnfCorrelationId: {}", event.getPnfCorrelationId());
-        RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
-        runtimeService.createMessageCorrelation("WorkflowMessage")
-                .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult();
-    }
 }
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 2869111..96562fe 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -35,11 +35,9 @@
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
@@ -55,11 +53,8 @@
     private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
 
-    private ApplicationEventPublisher applicationEventPublisher;
-
     @Autowired
-    public PnfEventReadyDmaapClient(Environment env, ApplicationEventPublisher applicationEventPublisher) {
-        this.applicationEventPublisher = applicationEventPublisher;
+    public PnfEventReadyDmaapClient(Environment env) {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
@@ -135,9 +130,11 @@
         }
 
         private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
-            unregister(pnfCorrelationId);
-            PnfNotificationEvent pnfNotificationEvent = new PnfNotificationEvent(this, pnfCorrelationId);
-            applicationEventPublisher.publishEvent(pnfNotificationEvent);
+            Runnable runnable = unregister(pnfCorrelationId);
+            if (runnable != null) {
+                logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
+                runnable.run();
+            }
         }
     }
 }
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
index c67b44e..e755214 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
@@ -20,15 +20,17 @@
 
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
-import org.camunda.bpm.engine.delegate.DelegateExecution;
-import org.junit.Test;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.junit.Test;
 
 public class CancelDmaapSubscriptionTest {
 
+    private static final String TEST_PNF_CORRELATION_ID = "testPnfCorrelationId";
+
     @Test
     public void shouldCancelSubscription() throws Exception {
         // given
@@ -37,7 +39,7 @@
         delegate.setDmaapClient(dmaapClientTest);
         DelegateExecution delegateExecution = mock(DelegateExecution.class);
         when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
-                .thenReturn("testPnfCorrelationId");
+                .thenReturn(TEST_PNF_CORRELATION_ID);
         when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
         dmaapClientTest.registerForUpdate("testPnfCorrelationId", () -> {
         });
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
index 96c3db0..df060ed 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
@@ -20,6 +20,13 @@
 
 package org.onap.so.bpmn.infrastructure.pnf.delegate;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 import org.camunda.bpm.engine.ProcessEngineServices;
 import org.camunda.bpm.engine.RuntimeService;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
@@ -27,14 +34,8 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
-import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.*;
 
 public class InformDmaapClientTest {
-
     @Before
     public void setUp() throws Exception {
         informDmaapClient = new InformDmaapClient();
@@ -73,19 +74,6 @@
         inOrder.verify(messageCorrelationBuilder).correlateWithResult();
     }
 
-    @Test
-    public void onApplicationEvent_validPnfNotificationEvent_expectedOutput() {
-
-        PnfNotificationEvent pnfNotificationEvent = new PnfNotificationEvent(this, "testPnfCorrelationId");
-
-        informDmaapClient.execute(delegateExecution);
-        informDmaapClient.onApplicationEvent(pnfNotificationEvent);
-
-        InOrder inOrder = inOrder(messageCorrelationBuilder);
-        inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey");
-        inOrder.verify(messageCorrelationBuilder).correlateWithResult();
-    }
-
     private DelegateExecution mockDelegateExecution() {
         DelegateExecution delegateExecution = mock(DelegateExecution.class);
         when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
index 9f31e2a..19e08d9 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
@@ -23,12 +23,10 @@
 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
 
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
@@ -49,12 +47,9 @@
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
-import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.core.env.Environment;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -85,9 +80,6 @@
     private Runnable threadMockToNotifyCamundaFlow;
     private ScheduledThreadPoolExecutor executorMock;
 
-    @Mock
-    private ApplicationEventPublisher applicationEventPublisher;
-
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
         when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
@@ -99,7 +91,7 @@
         when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
-        testedObject = new PnfEventReadyDmaapClient(env, applicationEventPublisher);
+        testedObject = new PnfEventReadyDmaapClient(env);
         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
         httpClientMock = mock(HttpClient.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
@@ -131,10 +123,7 @@
         assertEquals(captor1.getValue().getURI().getPath(),
                 "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
 
-        /**
-         * Two PNF returned from HTTP request.
-         */
-        verify(applicationEventPublisher, times(2)).publishEvent(any(PnfNotificationEvent.class));
+        verify(threadMockToNotifyCamundaFlow).run();
         verify(executorMock).shutdown();
     }
 
diff --git a/bpmn/so-bpmn-infrastructure-flows/src/test/java/org/onap/so/bpmn/infrastructure/process/CreateVcpeResCustServiceSimplifiedTest.java b/bpmn/so-bpmn-infrastructure-flows/src/test/java/org/onap/so/bpmn/infrastructure/process/CreateVcpeResCustServiceSimplifiedTest.java
index b0517ac..f51ea00 100644
--- a/bpmn/so-bpmn-infrastructure-flows/src/test/java/org/onap/so/bpmn/infrastructure/process/CreateVcpeResCustServiceSimplifiedTest.java
+++ b/bpmn/so-bpmn-infrastructure-flows/src/test/java/org/onap/so/bpmn/infrastructure/process/CreateVcpeResCustServiceSimplifiedTest.java
@@ -36,6 +36,7 @@
 import org.camunda.bpm.engine.runtime.Execution;
 import org.camunda.bpm.engine.runtime.ProcessInstance;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
@@ -51,8 +52,10 @@
 /**
  * Basic Integration test for createVcpeResCustService_Simplified.bpmn workflow.
  */
+@Ignore
 public class CreateVcpeResCustServiceSimplifiedTest extends BaseBPMNTest {
 
+    private static final long WORKFLOW_WAIT_TIME = 1000L;
     private Logger logger = LoggerFactory.getLogger(getClass());
 
     private static final String TEST_PROCESSINSTANCE_KEY = "CreateVcpeResCustService_simplified";
@@ -101,7 +104,7 @@
     }
 
     @Test
-    public void workflow_validInput_expectedOuput() {
+    public void workflow_validInput_expectedOuput() throws InterruptedException {
 
         mockCatalogDb();
         mockRequestDb();
@@ -112,26 +115,25 @@
                 runtimeService.startProcessInstanceByKey(TEST_PROCESSINSTANCE_KEY, testBusinessKey, variables);
         assertThat(pi).isNotNull();
 
-        Execution execution = runtimeService.createExecutionQuery().processDefinitionKey("CreateAndActivatePnfResource")
-                .activityId("WaitForDmaapPnfReadyNotification").singleResult();
+        Thread.sleep(WORKFLOW_WAIT_TIME);
 
-        if (!execution.isSuspended() && !execution.isEnded()) {
-            try {
+        Execution execution = runtimeService.createExecutionQuery().processInstanceBusinessKey(testBusinessKey)
+                .messageEventSubscriptionName("WorkflowMessage").singleResult();
 
-                runtimeService.signal(execution.getId());
-            } catch (Exception e) {
-                logger.info(e.getMessage(), e);
-            }
+        assertThat(execution).isNotNull();
+
+        int waitCount = 10;
+        while (!pi.isEnded() && waitCount >= 0) {
+            Thread.sleep(WORKFLOW_WAIT_TIME);
+            waitCount--;
         }
 
-        assertThat(pi).isStarted().hasPassedInOrder("createVCPE_startEvent", "preProcessRequest_ScriptTask",
+        assertThat(pi).isEnded().hasPassedInOrder("createVCPE_startEvent", "preProcessRequest_ScriptTask",
                 "sendSyncAckResponse_ScriptTask", "ScriptTask_0cdtchu", "DecomposeService", "ScriptTask_0lpv2da",
                 "ScriptTask_1y241p8", "CallActivity_1vc4jeh", "ScriptTask_1y5lvl7", "GeneratePnfUuid", "Task_14l19kv",
                 "Pnf_Con", "setPONR_ScriptTask", "postProcessAndCompletionRequest_ScriptTask",
                 "callCompleteMsoProcess_CallActivity", "ScriptTask_2", "CreateVCPE_EndEvent");
 
-        assertThat(pi).isEnded();
-
         List<ExecutionServiceInput> detailedMessages = grpcNettyServer.getDetailedMessages();
         assertThat(detailedMessages).hasSize(2);
         try {
@@ -144,6 +146,8 @@
     }
 
     private void checkConfigAssign(ExecutionServiceInput executionServiceInput) {
+
+        logger.info("Checking the configAssign request");
         ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
 
         /**
@@ -173,6 +177,8 @@
     }
 
     private void checkConfigDeploy(ExecutionServiceInput executionServiceInput) {
+
+        logger.info("Checking the configDeploy request");
         ActionIdentifiers actionIdentifiers = executionServiceInput.getActionIdentifiers();
 
         /**