Model distribution fails with model-loader 1.13.5

- move sdc-distribution-client instantiation out of the ModelController into a separate class
- add integration test with embedded kafka (not fully implemented)

Issue-ID: AAI-3818
Change-Id: I0b5dd118d9145372ddf123319b58829d0ef9275a
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
diff --git a/pom.xml b/pom.xml
index 61a4ec2..644e408 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,9 +240,25 @@
 			</exclusions>
 		</dependency>
 		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-aop</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka-clients</artifactId>
-			<version>3.3.1</version>
+			<!-- <version>3.3.1</version> -->
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka</artifactId>
+			<!-- <version>3.1.2</version> -->
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka-test</artifactId>
+			<!-- <version>3.1.1</version> -->
+			<scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>org.onap.aai</groupId>
diff --git a/src/main/java/org/onap/aai/modelloader/config/DistributionClientStartupConfig.java b/src/main/java/org/onap/aai/modelloader/config/DistributionClientStartupConfig.java
new file mode 100644
index 0000000..84c79f2
--- /dev/null
+++ b/src/main/java/org/onap/aai/modelloader/config/DistributionClientStartupConfig.java
@@ -0,0 +1,87 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom AG Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.modelloader.config;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.modelloader.notification.EventCallback;
+import org.onap.aai.modelloader.service.ModelLoaderMsgs;
+import org.onap.aai.modelloader.service.SdcConnectionJob;
+import org.onap.sdc.api.IDistributionClient;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConditionalOnProperty(value = "ml.distribution.connection.enabled", havingValue = "true", matchIfMissing = true)
+public class DistributionClientStartupConfig {
+
+    private static final Logger logger = LoggerFactory.getInstance().getLogger(DistributionClientStartupConfig.class);
+
+    private final IDistributionClient client;
+    private final ModelLoaderConfig config;
+    private final EventCallback eventCallback;
+
+    public DistributionClientStartupConfig(IDistributionClient client, ModelLoaderConfig config,
+            EventCallback eventCallback) {
+        this.client = client;
+        this.config = config;
+        this.eventCallback = eventCallback;
+    }
+
+    @EventListener(ApplicationReadyEvent.class)
+    protected void initSdcClient() {
+        // Initialize distribution client
+        logger.debug(ModelLoaderMsgs.INITIALIZING, "Initializing distribution client...");
+        IDistributionClientResult initResult = client.init(config, eventCallback);
+
+        if (initResult.getDistributionActionResult() == DistributionActionResultEnum.SUCCESS) {
+            // Start distribution client
+            logger.debug(ModelLoaderMsgs.INITIALIZING, "Starting distribution client...");
+            IDistributionClientResult startResult = client.start();
+            if (startResult.getDistributionActionResult() == DistributionActionResultEnum.SUCCESS) {
+                logger.info(ModelLoaderMsgs.INITIALIZING, "Connection to SDC established");
+            } else {
+                String errorMsg = "Failed to start distribution client: " + startResult.getDistributionMessageResult();
+                logger.error(ModelLoaderMsgs.ASDC_CONNECTION_ERROR, errorMsg);
+
+                // Kick off a timer to retry the SDC connection
+                Timer timer = new Timer();
+                TimerTask task = new SdcConnectionJob(client, config, eventCallback, timer);
+                timer.schedule(task, new Date(), 60000);
+            }
+        } else {
+            String errorMsg = "Failed to initialize distribution client: " + initResult.getDistributionMessageResult();
+            logger.error(ModelLoaderMsgs.ASDC_CONNECTION_ERROR, errorMsg);
+
+            // Kick off a timer to retry the SDC connection
+            Timer timer = new Timer();
+            TimerTask task = new SdcConnectionJob(client, config, eventCallback, timer);
+            timer.schedule(task, new Date(), 60000);
+        }
+    }
+}
diff --git a/src/main/java/org/onap/aai/modelloader/service/ModelController.java b/src/main/java/org/onap/aai/modelloader/service/ModelController.java
index 0921982..5c784b2 100644
--- a/src/main/java/org/onap/aai/modelloader/service/ModelController.java
+++ b/src/main/java/org/onap/aai/modelloader/service/ModelController.java
@@ -28,6 +28,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
+import javax.annotation.PostConstruct;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
@@ -71,6 +72,13 @@
         this.artifactDownloadManager = artifactDownloadManager;
     }
 
+    @PostConstruct
+    protected void start() {
+        if (!config.getASDCConnectionDisabled()) {
+            initSdcClient();
+        }
+    }
+
     /**
      * Responsible for stopping the connection to the distribution client before the resource is destroyed.
      */
diff --git a/src/test/java/org/onap/aai/modelloader/distribution/EventCallbackAspect.java b/src/test/java/org/onap/aai/modelloader/distribution/EventCallbackAspect.java
new file mode 100644
index 0000000..16f311b
--- /dev/null
+++ b/src/test/java/org/onap/aai/modelloader/distribution/EventCallbackAspect.java
@@ -0,0 +1,49 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom AG Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.modelloader.distribution;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Aspect;
+import org.springframework.stereotype.Component;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Aspect
+@Getter
+@Setter
+@Component
+/**
+ * Aspect to asynchronously wait for the EventCallback being called after a distribution event from sdc was published
+ */
+public class EventCallbackAspect {
+
+  private CountDownLatch countDownLatch;
+
+  @After(
+      "execution(* org.onap.aai.modelloader.notification.EventCallback.activateCallback(..))")
+  private void afterEventCallbackCalled() {
+    if (countDownLatch != null) {
+      countDownLatch.countDown();
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/test/java/org/onap/aai/modelloader/distribution/NotificationIntegrationTest.java b/src/test/java/org/onap/aai/modelloader/distribution/NotificationIntegrationTest.java
new file mode 100644
index 0000000..2645871
--- /dev/null
+++ b/src/test/java/org/onap/aai/modelloader/distribution/NotificationIntegrationTest.java
@@ -0,0 +1,66 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom AG Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.modelloader.distribution;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+@SpringBootTest(properties = { "model-loader.sdc.connection.enabled=true"})
+@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
+public class NotificationIntegrationTest {
+
+    @Autowired EventCallbackAspect eventCallbackAspect;
+
+    @Autowired
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @Value("${test.topic}")
+    private String topic;
+
+    @Test
+    @Disabled("This test is not yet implemented")
+    public void thatActivateCallbackIsCalled()
+      throws Exception {
+        String data = "Smth";
+
+        // TODO: send distribution event here
+        kafkaTemplate.send(topic, data);
+
+        // TODO: mock distribution client requests to /sdc/v1/artifactTypes
+        // TODO: mock distribution client requests to /sdc/v1/distributionKafkaData
+
+        // aspect will wait for EventCallback.activateCallback to be called
+        eventCallbackAspect.setCountDownLatch(new CountDownLatch(1));
+        boolean callbackCalled = eventCallbackAspect.getCountDownLatch().await(10, TimeUnit.SECONDS);
+        assertTrue(callbackCalled);
+    }
+}
diff --git a/src/test/java/org/onap/aai/modelloader/entity/model/TestModelArtifactHandler.java b/src/test/java/org/onap/aai/modelloader/entity/model/TestModelArtifactHandler.java
index fac50cb..177a8d2 100644
--- a/src/test/java/org/onap/aai/modelloader/entity/model/TestModelArtifactHandler.java
+++ b/src/test/java/org/onap/aai/modelloader/entity/model/TestModelArtifactHandler.java
@@ -57,7 +57,7 @@
 
     @BeforeEach
     public void setupMocks() {
-        MockitoAnnotations.initMocks(this);
+        MockitoAnnotations.openMocks(this);
     }
 
     @Test
diff --git a/src/test/java/org/onap/aai/modelloader/notification/TestArtifactDownloadManager.java b/src/test/java/org/onap/aai/modelloader/notification/TestArtifactDownloadManager.java
index c09eff5..48b9a66 100644
--- a/src/test/java/org/onap/aai/modelloader/notification/TestArtifactDownloadManager.java
+++ b/src/test/java/org/onap/aai/modelloader/notification/TestArtifactDownloadManager.java
@@ -27,7 +27,6 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.onap.aai.modelloader.fixture.NotificationDataFixtureBuilder.getNotificationDataWithInvalidType;
 import static org.onap.aai.modelloader.fixture.NotificationDataFixtureBuilder.getNotificationDataWithModelQuerySpec;
diff --git a/src/test/java/org/onap/aai/modelloader/restclient/TestBabelServiceClient.java b/src/test/java/org/onap/aai/modelloader/restclient/TestBabelServiceClient.java
index 19e35ca..77e1594 100644
--- a/src/test/java/org/onap/aai/modelloader/restclient/TestBabelServiceClient.java
+++ b/src/test/java/org/onap/aai/modelloader/restclient/TestBabelServiceClient.java
@@ -90,6 +90,8 @@
         configProperties.put("ml.babel.TRUSTSTORE_FILE", "src/test/resources/auth/aai-client-dummy.p12");
         configProperties.put("ml.babel.BASE_URL", url);
         configProperties.put("ml.babel.GENERATE_ARTIFACTS_URL", "generate");
+        configProperties.put("ml.aai.RESTCLIENT_CONNECT_TIMEOUT", "12000");
+        configProperties.put("ml.aai.RESTCLIENT_READ_TIMEOUT", "12000");
         BabelServiceClient client =
                 new HttpsBabelServiceClientFactory().create(new ModelLoaderConfig(configProperties, "."));
         List<BabelArtifact> result =
diff --git a/src/test/java/org/onap/aai/modelloader/service/TestModelController.java b/src/test/java/org/onap/aai/modelloader/service/TestModelController.java
index 9106342..82f7203 100644
--- a/src/test/java/org/onap/aai/modelloader/service/TestModelController.java
+++ b/src/test/java/org/onap/aai/modelloader/service/TestModelController.java
@@ -23,7 +23,6 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties
new file mode 100644
index 0000000..96816ce
--- /dev/null
+++ b/src/test/resources/application.properties
@@ -0,0 +1,10 @@
+CONFIG_HOME=src/test/resources
+spring.kafka.consumer.auto-offset-reset=earliest
+spring.kafka.consumer.group-id=aai
+spring.kafka.consumer.client-id=aai-model-loader
+      
+test.topic=SDC-DISTR-NOTIF-TOPIC
+
+spring.sleuth.enabled=false
+
+ml.distribution.connection.enabled=false # avoid having the distribution client running in the background (requires active kafka)
\ No newline at end of file
diff --git a/src/test/resources/model-loader.properties b/src/test/resources/model-loader.properties
index 51a38c5..4c24679 100644
--- a/src/test/resources/model-loader.properties
+++ b/src/test/resources/model-loader.properties
@@ -7,8 +7,8 @@
 ml.distribution.KEYSTORE_PASSWORD=
 ml.distribution.KEYSTORE_FILE=
 ml.distribution.PASSWORD=Aa123456
-ml.distribution.POLLING_INTERVAL=30
-ml.distribution.POLLING_TIMEOUT=20
+ml.distribution.POLLING_INTERVAL=5
+ml.distribution.POLLING_TIMEOUT=3
 ml.distribution.USER=ci
 ml.distribution.ARTIFACT_TYPES=MODEL_QUERY_SPEC,TOSCA_CSAR