PNF PnP now always waits

Issue-ID: SO-797
Change-Id: I717b987e78680157c4a2c0bf8d497855d1890077
Signed-off-by: biniek <lukasz.biniek@nokia.com>
diff --git a/bpmn/so-bpmn-infrastructure-common/pom.xml b/bpmn/so-bpmn-infrastructure-common/pom.xml
index 9bc9cc5..ad51b3f 100644
--- a/bpmn/so-bpmn-infrastructure-common/pom.xml
+++ b/bpmn/so-bpmn-infrastructure-common/pom.xml
@@ -317,5 +317,11 @@
 			<classifier>tests</classifier>
 			<scope>test</scope>
 		</dependency>
+        <dependency>
+            <groupId>org.camunda.bpm.springboot</groupId>
+            <artifactId>camunda-bpm-spring-boot-starter-test</artifactId>
+            <version>2.3.0</version>
+            <scope>test</scope>
+        </dependency>
 	</dependencies>
 </project>
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java
index 63db293..12ddf84 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegate.java
@@ -67,7 +67,6 @@
             AaiResponse aaiResponse = implementation.check(correlationId, aaiConnection);
 
             execution.setVariableLocal(AAI_CONTAINS_INFO_ABOUT_PNF, aaiResponse.getContainsInfoAboutPnf());
-            execution.setVariableLocal(AAI_CONTAINS_INFO_ABOUT_IP, aaiResponse.getContainsInfoAboutIp());
         } catch (IOException e) {
         	LOGGER.error("IOException",e);
             new ExceptionUtil().buildAndThrowWorkflowException(execution, 9999, e.getMessage());
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputs.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputs.java
new file mode 100644
index 0000000..e4866f5
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputs.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.CORRELATION_ID;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION;
+
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.delegate.JavaDelegate;
+import org.onap.so.bpmn.common.scripts.ExceptionUtil;
+import org.onap.so.logger.MsoLogger;
+
+public class PnfCheckInputs implements JavaDelegate {
+
+    private static MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.GENERAL, PnfCheckInputs.class);
+
+    private String defaultTimeout;
+
+    @Override
+    public void execute(DelegateExecution execution) throws Exception {
+        String correlationId = (String) execution.getVariable(CORRELATION_ID);
+        if (correlationId == null) {
+            new ExceptionUtil().buildAndThrowWorkflowException(execution, 9999, "correlationId variable not defined");
+        }
+        String timeout = (String) execution.getVariable(TIMEOUT_FOR_NOTIFICATION);
+        if (timeout == null) {
+            LOGGER.debug("timeoutForPnfEntryNotification variable not found, setting default");
+            if (defaultTimeout == null) {
+                new ExceptionUtil().buildAndThrowWorkflowException(execution, 9999,
+                        "default timeoutForPnfEntryNotification value not defined");
+            }
+            execution.setVariable(TIMEOUT_FOR_NOTIFICATION, defaultTimeout);
+        }
+    }
+
+    public void setDefaultTimeout(String defaultTimeout) {
+        this.defaultTimeout = defaultTimeout;
+    }
+}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionTestImpl.java
new file mode 100644
index 0000000..201e791
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionTestImpl.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import org.onap.aai.domain.yang.Pnf;
+import org.onap.so.bpmn.infrastructure.pnf.implementation.AaiConnection;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class AaiConnectionTestImpl implements AaiConnection {
+
+    public static final String ID_WITHOUT_ENTRY = "IdWithoutEntry";
+    public static final String ID_WITH_ENTRY = "idWithEntryNoIp";
+
+    private Map<String, Pnf> created = new HashMap<>();
+
+    @Override
+    public Optional<Pnf> getEntryFor(String correlationId) throws IOException {
+        if (Objects.equals(correlationId, ID_WITH_ENTRY)) {
+            return Optional.of(new Pnf());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public void createEntry(String correlationId, Pnf entry) throws IOException {
+        created.put(correlationId, entry);
+    }
+
+    public Map<String, Pnf> getCreated() {
+        return created;
+    }
+
+    public void reset() {
+        created.clear();
+    }
+}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionThrowingException.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionThrowingException.java
new file mode 100644
index 0000000..7df6757
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/AaiConnectionThrowingException.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.onap.aai.domain.yang.Pnf;
+import org.onap.so.bpmn.infrastructure.pnf.implementation.AaiConnection;
+
+public class AaiConnectionThrowingException implements AaiConnection {
+
+    @Override
+    public Optional<Pnf> getEntryFor(String correlationId) throws IOException {
+        throw new IOException();
+    }
+
+    @Override
+    public void createEntry(String correlationId, Pnf entry) throws IOException {
+        throw new IOException();
+    }
+}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
new file mode 100644
index 0000000..4282b0f
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CancelDmaapSubscriptionTest.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CancelDmaapSubscriptionTest {
+
+    @Test
+    public void shouldCancelSubscription() throws Exception {
+        // given
+        CancelDmaapSubscription delegate = new CancelDmaapSubscription();
+        DmaapClientTestImpl dmaapClientTest = new DmaapClientTestImpl();
+        delegate.setDmaapClient(dmaapClientTest);
+        DelegateExecution delegateExecution = mock(DelegateExecution.class);
+        when(delegateExecution.getVariable(eq(ExecutionVariableNames.CORRELATION_ID))).thenReturn("testCorrelationId");
+        when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
+        dmaapClientTest.registerForUpdate("testCorrelationId", () -> {
+        });
+        // when
+        delegate.execute(delegateExecution);
+        // then
+        assertThat(dmaapClientTest.haveRegisteredConsumer()).isFalse();
+    }
+}
\ No newline at end of file
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegateTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegateTest.java
new file mode 100644
index 0000000..3eb3bd8
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CheckAaiForCorrelationIdDelegateTest.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.AaiConnectionTestImpl.ID_WITHOUT_ENTRY;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.AaiConnectionTestImpl.ID_WITH_ENTRY;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.AAI_CONTAINS_INFO_ABOUT_PNF;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.CORRELATION_ID;
+
+import org.camunda.bpm.engine.delegate.BpmnError;
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.onap.so.bpmn.core.WorkflowException;
+
+@RunWith(Enclosed.class)
+public class CheckAaiForCorrelationIdDelegateTest {
+
+    public static class ConnectionOkTests {
+
+        private CheckAaiForCorrelationIdDelegate delegate;
+
+        @Before
+        public void setUp() {
+            delegate = new CheckAaiForCorrelationIdDelegate();
+            delegate.setAaiConnection(new AaiConnectionTestImpl());
+        }
+
+        @Test
+        public void shouldThrowExceptionWhenCorrelationIdIsNotSet() {
+            // given
+            DelegateExecution execution = mock(DelegateExecution.class);
+            when(execution.getVariable(CORRELATION_ID)).thenReturn(null);
+            when(execution.getVariable("testProcessKey")).thenReturn("testProcessKeyValue");
+            // when, then
+            assertThatThrownBy(() -> delegate.execute(execution)).isInstanceOf(BpmnError.class);
+            verify(execution).setVariable(eq("WorkflowException"), any(WorkflowException.class));
+        }
+
+        @Test
+        public void shouldSetCorrectVariablesWhenAaiDoesNotContainInfoAboutPnf() throws Exception {
+            // given
+            DelegateExecution execution = mock(DelegateExecution.class);
+            when(execution.getVariable(CORRELATION_ID)).thenReturn(ID_WITHOUT_ENTRY);
+            // when
+            delegate.execute(execution);
+            // then
+            verify(execution).setVariableLocal(AAI_CONTAINS_INFO_ABOUT_PNF, false);
+        }
+
+        @Test
+        public void shouldSetCorrectVariablesWhenAaiContainsInfoAboutPnfWithoutIp() throws Exception {
+            // given
+            DelegateExecution execution = mock(DelegateExecution.class);
+            when(execution.getVariable(CORRELATION_ID)).thenReturn(ID_WITH_ENTRY);
+            // when
+            delegate.execute(execution);
+            // then
+            verify(execution).setVariableLocal(AAI_CONTAINS_INFO_ABOUT_PNF, true);
+        }
+    }
+
+    public static class NoConnectionTests {
+
+        private CheckAaiForCorrelationIdDelegate delegate;
+
+        @Before
+        public void setUp() {
+            delegate = new CheckAaiForCorrelationIdDelegate();
+            delegate.setAaiConnection(new AaiConnectionThrowingException());
+        }
+
+        @Test
+        public void shouldThrowExceptionWhenIoExceptionOnConnectionToAai() {
+            // given
+            DelegateExecution execution = mock(DelegateExecution.class);
+            when(execution.getVariable(CORRELATION_ID)).thenReturn(ID_WITH_ENTRY);
+            when(execution.getVariable("testProcessKey")).thenReturn("testProcessKey");
+            // when, then
+            assertThatThrownBy(() -> delegate.execute(execution)).isInstanceOf(BpmnError.class);
+            verify(execution).setVariable(eq("WorkflowException"), any(WorkflowException.class));
+        }
+    }
+}
\ No newline at end of file
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAaiEntryWithPnfIdDelegateTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAaiEntryWithPnfIdDelegateTest.java
new file mode 100644
index 0000000..465dc08
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/CreateAaiEntryWithPnfIdDelegateTest.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.CORRELATION_ID;
+
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.junit.Test;
+import org.onap.aai.domain.yang.Pnf;
+
+public class CreateAaiEntryWithPnfIdDelegateTest {
+
+    @Test
+    public void shouldSetPnfIdAndPnfName() throws Exception {
+        // given
+        CreateAaiEntryWithPnfIdDelegate delegate = new CreateAaiEntryWithPnfIdDelegate();
+        AaiConnectionTestImpl aaiConnection = new AaiConnectionTestImpl();
+        delegate.setAaiConnection(aaiConnection);
+        DelegateExecution execution = mock(DelegateExecution.class);
+        when(execution.getVariable(eq(CORRELATION_ID))).thenReturn("testCorrelationId");
+        // when
+        delegate.execute(execution);
+        // then
+        Pnf createdEntry = aaiConnection.getCreated().get("testCorrelationId");
+        assertThat(createdEntry.getPnfId()).isEqualTo("testCorrelationId");
+        assertThat(createdEntry.getPnfName()).isEqualTo("testCorrelationId");
+        assertThat(createdEntry.isInMaint()).isTrue();
+    }
+}
\ No newline at end of file
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
new file mode 100644
index 0000000..f2a4205
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
@@ -0,0 +1,63 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import java.util.Objects;
+
+public class DmaapClientTestImpl implements DmaapClient {
+
+    private String correlationId;
+    private Runnable informConsumer;
+
+    @Override
+    public void registerForUpdate(String correlationId, Runnable informConsumer) {
+        this.correlationId = correlationId;
+        this.informConsumer = informConsumer;
+    }
+
+    @Override
+    public Runnable unregister(String correlationId) {
+        if (Objects.equals(this.correlationId, correlationId)) {
+            this.correlationId = null;
+            Runnable informConsumer = this.informConsumer;
+            this.informConsumer = null;
+            return informConsumer;
+        }
+        return null;
+    }
+
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public Runnable getInformConsumer() {
+        return informConsumer;
+    }
+
+    public void sendMessage() {
+        informConsumer.run();
+    }
+
+    public boolean haveRegisteredConsumer() {
+        return correlationId != null;
+    }
+}
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
new file mode 100644
index 0000000..ddf33a1
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/InformDmaapClientTest.java
@@ -0,0 +1,88 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import org.camunda.bpm.engine.ProcessEngineServices;
+import org.camunda.bpm.engine.RuntimeService;
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class InformDmaapClientTest {
+    @Before
+    public void setUp() throws Exception {
+        informDmaapClient = new InformDmaapClient();
+        dmaapClientTest = new DmaapClientTestImpl();
+        informDmaapClient.setDmaapClient(dmaapClientTest);
+        delegateExecution = mockDelegateExecution();
+    }
+
+    private InformDmaapClient informDmaapClient;
+
+    private DmaapClientTestImpl dmaapClientTest;
+
+    private DelegateExecution delegateExecution;
+
+    private MessageCorrelationBuilder messageCorrelationBuilder;
+
+    @Test
+    public void shouldSendListenerToDmaapClient() throws Exception {
+        // when
+        informDmaapClient.execute(delegateExecution);
+        // then
+        assertThat(dmaapClientTest.getCorrelationId()).isEqualTo("testCorrelationId");
+        assertThat(dmaapClientTest.getInformConsumer()).isNotNull();
+        verifyZeroInteractions(messageCorrelationBuilder);
+    }
+
+    @Test
+    public void shouldSendListenerToDmaapClientAndSendMessageToCamunda() throws Exception {
+        // when
+        informDmaapClient.execute(delegateExecution);
+        dmaapClientTest.getInformConsumer().run();
+        // then
+        assertThat(dmaapClientTest.getCorrelationId()).isEqualTo("testCorrelationId");
+        InOrder inOrder = inOrder(messageCorrelationBuilder);
+        inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey");
+        inOrder.verify(messageCorrelationBuilder).correlateWithResult();
+    }
+
+    private DelegateExecution mockDelegateExecution() {
+        DelegateExecution delegateExecution = mock(DelegateExecution.class);
+        when(delegateExecution.getVariable(eq(ExecutionVariableNames.CORRELATION_ID))).thenReturn("testCorrelationId");
+        when(delegateExecution.getProcessBusinessKey()).thenReturn("testBusinessKey");
+        ProcessEngineServices processEngineServices = mock(ProcessEngineServices.class);
+        when(delegateExecution.getProcessEngineServices()).thenReturn(processEngineServices);
+        RuntimeService runtimeService = mock(RuntimeService.class);
+        when(processEngineServices.getRuntimeService()).thenReturn(runtimeService);
+        messageCorrelationBuilder = mock(MessageCorrelationBuilder.class);
+        when(runtimeService.createMessageCorrelation(any())).thenReturn(messageCorrelationBuilder);
+        when(messageCorrelationBuilder.processInstanceBusinessKey(any())).thenReturn(messageCorrelationBuilder);
+        return delegateExecution;
+    }
+}
\ No newline at end of file
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputsTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputsTest.java
new file mode 100644
index 0000000..aab289f
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/delegate/PnfCheckInputsTest.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.delegate;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.CORRELATION_ID;
+import static org.onap.so.bpmn.infrastructure.pnf.delegate.ExecutionVariableNames.TIMEOUT_FOR_NOTIFICATION;
+
+import org.camunda.bpm.engine.delegate.BpmnError;
+import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PnfCheckInputsTest {
+
+    private PnfCheckInputs delegate;
+
+    @Before
+    public void setUp() throws Exception {
+        delegate = new PnfCheckInputs();
+    }
+
+    private DelegateExecution mockDelegateExecution() {
+        DelegateExecution delegateExecution = mock(DelegateExecution.class);
+        when(delegateExecution.getVariable("testProcessKey")).thenReturn("testProcessKeyValue");
+        return delegateExecution;
+    }
+
+    @Test
+    public void shouldThrowException_whenPnfIdNotSet() throws Exception {
+        // given
+        DelegateExecution delegateExecution = mockDelegateExecution();
+        // when, then
+        assertThatThrownBy(() -> delegate.execute(delegateExecution)).isInstanceOf(BpmnError.class);
+    }
+
+    private DelegateExecution mockDelegateExecutionWithCorrelationId() {
+        DelegateExecution delegateExecution = mockDelegateExecution();
+        when(delegateExecution.getVariable(CORRELATION_ID)).thenReturn("testCorrelationId");
+        return delegateExecution;
+    }
+
+    @Test
+    public void shouldThrowException_whenTimeoutIsNotSetAndDefaultIsNotDefined() throws Exception {
+        // given
+        DelegateExecution delegateExecution = mockDelegateExecutionWithCorrelationId();
+        // when, then
+        assertThatThrownBy(() -> delegate.execute(delegateExecution)).isInstanceOf(BpmnError.class);
+    }
+
+    @Test
+    public void shouldSetDefaultTimeout_whenTimeoutIsNotSet() throws Exception {
+        // given
+        String defaultTimeout = "T1D";
+        delegate.setDefaultTimeout(defaultTimeout);
+        DelegateExecution delegateExecution = mockDelegateExecutionWithCorrelationId();
+        // when
+        delegate.execute(delegateExecution);
+        // then
+        verify(delegateExecution).setVariable(eq(TIMEOUT_FOR_NOTIFICATION), eq(defaultTimeout));
+    }
+}
\ No newline at end of file
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
new file mode 100644
index 0000000..08ac9b6
--- /dev/null
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
@@ -0,0 +1,197 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP - SO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicHttpResponse;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.springframework.core.env.Environment;
+@RunWith(MockitoJUnitRunner.class)
+public class PnfEventReadyDmaapClientTest {
+
+    private static final String CORRELATION_ID = "corrTestId";
+    private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
+    private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = "[\n"
+            + "    {\n"
+            + "        \"pnfRegistrationFields\" : {\n"
+            + "        \"correlationId\" : \"%s\",\n"
+            + "        \"value\" : \"value1\"\n"
+            + "        }\n"
+            + "    },\n"
+            + "    {\n"
+            + "        \"pnfRegistrationFields\" : {\n"
+            + "        \"correlationId\" : \"corr\",\n"
+            + "        \"value\" : \"value2\"\n"
+            + "        }\n"
+            + "    }\n"
+            + "]";
+    private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
+            "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
+
+    private static final String HOST = "hostTest";
+    private static final int PORT = 1234;
+    private static final String PROTOCOL = "http";
+    private static final String URI_PATH_PREFIX = "eventsForTesting";
+    private static final String EVENT_TOPIC_TEST = "eventTopicTest";
+    private static final String CONSUMER_ID = "consumerTestId";
+    private static final String CONSUMER_GROUP = "consumerGroupTest";
+    @Mock
+    private Environment env;
+    @InjectMocks
+    private PnfEventReadyDmaapClient testedObject = new PnfEventReadyDmaapClient();
+;
+    private DmaapTopicListenerThread testedObjectInnerClassThread;
+    private HttpClient httpClientMock;
+    private Runnable threadMockToNotifyCamundaFlow;
+    private ScheduledExecutorService executorMock;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+    	when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
+    	when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
+        testedObject.setDmaapProtocol(PROTOCOL);
+        testedObject.setDmaapUriPathPrefix(URI_PATH_PREFIX);
+        testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
+        testedObject.setConsumerId(CONSUMER_ID);
+        testedObject.setConsumerGroup(CONSUMER_GROUP);
+        testedObject.setDmaapClientDelayInSeconds(1);
+        testedObject.init();
+        testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
+        httpClientMock = mock(HttpClient.class);
+        threadMockToNotifyCamundaFlow = mock(Runnable.class);
+        executorMock = mock(ScheduledExecutorService.class);
+        setPrivateField();
+    }
+
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the key that we get from response
+     * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is
+     * empty) and shutdown the executor because of empty map
+     */
+    @Test
+    public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
+            throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
+        testedObjectInnerClassThread.run();
+        ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
+        verify(httpClientMock).execute(captor1.capture());
+        assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
+                .hasPath(
+                        "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
+        verify(threadMockToNotifyCamundaFlow).run();
+        verify(executorMock).shutdownNow();
+    }
+
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the correlationId that does not match to correlationId
+     * taken from http response. run method should not do anything with the map not run any thread to notify camunda
+     * process
+     */
+    @Test
+    public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
+            throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(
+                        String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
+        testedObjectInnerClassThread.run();
+        verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
+    }
+
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
+     * run method should not do anything with the map and not run any thread to notify camunda process
+     */
+    @Test
+    public void correlationIdIsNotFoundInHttpResponse() throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
+        testedObjectInnerClassThread.run();
+        verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
+    }
+
+    private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
+        Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(testedObject, httpClientMock);
+        httpClientField.setAccessible(false);
+
+        Field executorField = testedObject.getClass().getDeclaredField("executor");
+        executorField.setAccessible(true);
+        executorField.set(testedObject, executorMock);
+        executorField.setAccessible(false);
+
+        Field pnfCorrelationToThreadMapField = testedObject.getClass()
+                .getDeclaredField("pnfCorrelationIdToThreadMap");
+        pnfCorrelationToThreadMapField.setAccessible(true);
+        Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
+        pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
+        pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
+
+        Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
+        threadRunFlag.setAccessible(true);
+        threadRunFlag.set(testedObject, true);
+        threadRunFlag.setAccessible(false);
+    }
+
+    private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
+        HttpEntity entity = new StringEntity(json);
+        ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
+        HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
+        response.setEntity(entity);
+        response.setStatusCode(200);
+        return response;
+    }
+
+}