Merge "Switch from cloudify helm plugin to cloudify/tosca."
diff --git a/components/bbs-event-processor/Dockerfile b/components/bbs-event-processor/Dockerfile
index e799bd9..a9e6a89 100644
--- a/components/bbs-event-processor/Dockerfile
+++ b/components/bbs-event-processor/Dockerfile
@@ -1,4 +1,4 @@
-FROM openjdk:8-jre-alpine
+FROM openjdk:11-jre-slim
ARG PROJECT_BUILD_DIR_NAME
ARG FINAL_JAR
@@ -6,11 +6,14 @@
ARG DOCKER_ARTIFACT_DIR
#Add a new user and group to allow container to be run as non-root
-RUN addgroup -S bbs-ep && adduser -S -G bbs-ep bbs-ep
+RUN addgroup --system bbs-ep && adduser --system --ingroup bbs-ep bbs-ep
#Copy dependencies and executable jar
WORKDIR ${DOCKER_ARTIFACT_DIR}
COPY ${PROJECT_BUILD_DIR_NAME}/${FINAL_JAR} .
+COPY KeyStore.jks .
+COPY KeyStorePass.txt .
+
#Overcome Docker limitation to put ARG inside ENTRYPOINT
RUN ln -s ${FINAL_JAR} bbs-ep.jar
COPY ${PROJECT_BUILD_DIR_NAME}/${DEPENDENCIES_DIR} ./${DEPENDENCIES_DIR}
diff --git a/components/bbs-event-processor/KeyStore-update.jks b/components/bbs-event-processor/KeyStore-update.jks
new file mode 100644
index 0000000..366cc90
--- /dev/null
+++ b/components/bbs-event-processor/KeyStore-update.jks
Binary files differ
diff --git a/components/bbs-event-processor/KeyStore.jks b/components/bbs-event-processor/KeyStore.jks
new file mode 100644
index 0000000..366cc90
--- /dev/null
+++ b/components/bbs-event-processor/KeyStore.jks
Binary files differ
diff --git a/components/bbs-event-processor/KeyStorePass-update.txt b/components/bbs-event-processor/KeyStorePass-update.txt
new file mode 100644
index 0000000..0c759a4
--- /dev/null
+++ b/components/bbs-event-processor/KeyStorePass-update.txt
@@ -0,0 +1 @@
+test123
\ No newline at end of file
diff --git a/components/bbs-event-processor/KeyStorePass.txt b/components/bbs-event-processor/KeyStorePass.txt
new file mode 100644
index 0000000..0c759a4
--- /dev/null
+++ b/components/bbs-event-processor/KeyStorePass.txt
@@ -0,0 +1 @@
+test123
\ No newline at end of file
diff --git a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml b/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
deleted file mode 100644
index 54d3aae..0000000
--- a/components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-#============LICENSE_START========================================================
-#=================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-
-
-tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.1.0
-pnf_reregistration_url: https:message-router:3905/events/unauthenticated.PNF_UPDATE
-cpe_authentication_url: https:message-router:3905/events/unauthenticated.CPE_AUTHENTICATION
-close_loop_url: https:message-router:3905/events/unauthenticated.DCAE_CL_OUTPUT
-application_rereg_policy_scope: policyScopeReReg
-application_rereg_cl_control_name: clControlNameReReg
-application_cpeAuth_policy_scope: policyScopeCpeAuth
-application_cpeAuth_cl_control_name: clControlNameCpeAuth
-application_cbs_polling_interval_sec: 120
-dmaap_consumer_id: c12
-dmaap_consumer_group: OpenDcae-c12
diff --git a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template b/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
deleted file mode 100644
index 2600ffe..0000000
--- a/components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
+++ /dev/null
@@ -1,188 +0,0 @@
-# -*- indent-tabs-mode: nil -*- # vi: set expandtab:
-#
-# ============LICENSE_START====================================================
-# =============================================================================
-# Copyright (c) 2019 AT&T, NOKIA
-# =============================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END======================================================
-
-tosca_definitions_version: cloudify_dsl_1_3
-
-imports:
- - "http://www.getcloudify.org/spec/cloudify/3.4/types.yaml"
- - https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.13/k8splugin_types.yaml
-
-inputs:
- aai_enrichment_host:
- type: string
- default: "aai.onap"
- aai_enrichment_port:
- type: integer
- default: 8443
- aai_enrichment_protocol:
- type: string
- default: "https"
- aai_secure_enable_cert:
- type: boolean
- description: enable certificates-based connection with AAI
- default: true
- tag_version:
- type: string
- replicas:
- type: integer
- description: number of instances
- default: 1
- pnf_reregistration_url:
- type: string
- cpe_authentication_url:
- type: string
- close_loop_url:
- type: string
- application_policy_version:
- description: Policy version value for building CL events
- type: string
- default: "1.0.0.5"
- application_cl_target_type:
- description: Close Loop target type value for building CL events
- type: string
- default: "VM"
- application_cl_event_status:
- description: Close Loop event status value for building CL events
- type: string
- default: "ONSET"
- application_cl_version:
- description: Close Loop version value for building CL events
- type: string
- default: "1.0.2"
- application_cl_target:
- description: Close Loop target value for building CL events
- type: string
- default: "vserver.vserver-name"
- application_cl_originator:
- description: Close Loop originator value for building CL events
- type: string
- default: "DCAE-BBS-ep"
- application_rereg_policy_scope:
- description: Policy Scope value for building PNF relocation CL event
- type: string
- application_rereg_cl_control_name:
- description: Close Loop control name value for building PNF relocation CL event
- type: string
- application_cpeAuth_policy_scope:
- description: Policy Scope value for building CPE Authentication CL event
- type: string
- application_cpeAuth_cl_control_name:
- description: Close Loop control name value for building CPE Authentication CL event
- type: string
- application_cbs_polling_interval_sec:
- type: integer
- default: 300
- application_logging_level:
- type: string
- default: "INFO"
- dmaap_username:
- type: string
- default: admin
- dmaap_password:
- type: string
- default: admin
- dmaap_consumer_id:
- type: string
- dmaap_consumer_group:
- type: string
- dmaap_secure_enable_cert:
- type: boolean
- description: enable certificates-based connection with DMaaP
- default: true
-node_templates:
- bbs-event-processor:
- type: dcae.nodes.ContainerizedServiceComponent
- properties:
- application_config:
- streams_subscribes:
- pnf_reregistration:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: pnf_reregistration_url }
- cpe_authentication:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: cpe_authentication_url }
- streams_publishes:
- close_loop:
- type: message_router
- aaf_username: { get_input: dmaap_username }
- aaf_password: { get_input: dmaap_password }
- dmaap_info:
- topic_url: { get_input: close_loop_url }
- dmaap.protocol: "https"
- dmaap.contentType: "application/json"
- dmaap.consumer.consumerId: { get_input: dmaap_consumer_id }
- dmaap.consumer.consumerGroup: { get_input: dmaap_consumer_group }
- dmaap.messageLimit: -1
- dmaap.timeoutMs: -1
- aai.host: { get_input: aai_enrichment_host }
- aai.port: { get_input: aai_enrichment_port }
- aai.protocol: { get_input: aai_enrichment_protocol }
- aai.username: "AAI"
- aai.password: "AAI"
- aai.aaiIgnoreSslCertificateErrors: true
- application.pipelinesPollingIntervalSec: 30
- application.pipelinesTimeoutSec: 15
- application.cbsPollingIntervalSec: { get_input: application_cbs_polling_interval_sec }
- application.policyVersion: { get_input: application_policy_version }
- application.clTargetType: { get_input: application_cl_target_type }
- application.clEventStatus: { get_input: application_cl_event_status }
- application.clVersion: { get_input: application_cl_version }
- application.clTarget: { get_input: application_cl_target }
- application.clOriginator: { get_input: application_cl_originator }
- application.reregistration.policyScope: { get_input: application_rereg_policy_scope }
- application.reregistration.clControlName: { get_input: application_rereg_cl_control_name }
- application.cpe.authentication.policyScope: { get_input: application_cpeAuth_policy_scope }
- application.cpe.authentication.clControlName: { get_input: application_cpeAuth_cl_control_name }
- application.reregistration.configKey: "pnf_reregistration"
- application.cpeAuth.configKey: "cpe_authentication"
- application.closeLoop.configKey: "close_loop"
- application.loggingLevel: { get_input: application_logging_level }
- application.ssl.keyStorePath: "/opt/app/bbs-event-processor/etc/cert/cert.jks"
- application.ssl.keyStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/jks.pass"
- application.ssl.trustStorePath: "/opt/app/bbs-event-processor/etc/cert/trust.jks"
- application.ssl.trustStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/trust.pass"
- application.ssl.enableAaiCertAuth: { get_input: aai_secure_enable_cert }
- application.ssl.enableDmaapCertAuth: { get_input: dmaap_secure_enable_cert }
- docker_config:
- healthcheck:
- endpoint: /heartbeat
- interval: 180s
- timeout: 5s
- type: http
- image:
- { get_input: tag_version }
- replicas: {get_input: replicas}
- service_component_type: 'bbs-event-processor'
- log_info:
- log_directory: "/opt/app/bbs-event-processor/logs"
- tls_info:
- cert_directory: '/opt/app/bbs-event-processor/etc/cert'
- use_tls: true
- interfaces:
- cloudify.interfaces.lifecycle:
- start:
- inputs:
- ports:
- - concat: ["8100:", "30810"]
diff --git a/components/bbs-event-processor/pom.xml b/components/bbs-event-processor/pom.xml
index 85c8cf4..a2cc155 100644
--- a/components/bbs-event-processor/pom.xml
+++ b/components/bbs-event-processor/pom.xml
@@ -12,7 +12,7 @@
<groupId>org.onap.dcaegen2.services.components</groupId>
<artifactId>bbs-event-processor</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<name>dcaegen2-services-bbs-event-processor</name>
<description>BBS Re-Registration and CPE Authentication Handler</description>
@@ -26,25 +26,21 @@
</licenses>
<properties>
- <java.version>8</java.version>
- <immutables.version>2.7.5</immutables.version>
- <spring-boot.version>2.1.3.RELEASE</spring-boot.version>
- <tomcat.version>8.5.32</tomcat.version>
- <slf4j.version>1.7.25</slf4j.version>
+ <java.version>11</java.version>
+ <immutables.version>2.8.3</immutables.version>
+ <spring-boot.version>2.1.12.RELEASE</spring-boot.version>
<junit-platform.version>1.1.0</junit-platform.version>
<jacoco.version>0.8.2</jacoco.version>
- <dcae.sdk.version>1.1.4</dcae.sdk.version>
- <wiremock.version>2.21.0</wiremock.version>
- <springfox-swagger.version>2.8.0</springfox-swagger.version>
+ <dcae.sdk.version>1.3.4</dcae.sdk.version>
+ <wiremock.version>2.24.0</wiremock.version>
+ <jaxb.api.version>2.3.0</jaxb.api.version>
+ <springfox-swagger.version>2.9.2</springfox-swagger.version>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
<bbs-event-processor.main.class>org.onap.bbs.event.processor.Application</bbs-event-processor.main.class>
<dependency.dir.name>libs</dependency.dir.name>
<dependency.dir.location>${project.build.directory}/${dependency.dir.name}</dependency.dir.location>
<docker.image.name>onap/${project.groupId}.${project.artifactId}</docker.image.name>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
- <sonar.coverage.jacoco.xmlReportPaths>
- ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
- </sonar.coverage.jacoco.xmlReportPaths>
</properties>
<dependencyManagement>
@@ -61,7 +57,7 @@
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>common-dependency</artifactId>
+ <artifactId>http-client</artifactId>
<version>${dcae.sdk.version}</version>
</dependency>
<dependency>
@@ -96,6 +92,11 @@
<artifactId>wiremock-jre8</artifactId>
<version>${wiremock.version}</version>
</dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>${jaxb.api.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -110,7 +111,7 @@
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>common-dependency</artifactId>
+ <artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -170,6 +171,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
@@ -337,14 +342,14 @@
<executions>
<execution>
<id>build-bbs-event-processor-image</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
<execution>
<id>tag-and-push-image-latest</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
@@ -357,7 +362,7 @@
</execution>
<execution>
<id>tag-and-push-image-with-version</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
@@ -370,7 +375,7 @@
</execution>
<execution>
<id>tag-and-push-image-with-version-and-date</id>
- <phase>deploy</phase>
+ <phase>package</phase>
<goals>
<goal>tag</goal>
<goal>push</goal>
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
index 66e1f86..94b955f 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/Application.java
@@ -71,7 +71,7 @@
@Bean
TaskScheduler threadPoolTaskScheduler() {
- ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+ var scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(THREADS_IN_POOL);
scheduler.setThreadNamePrefix("pipeline-thrd-");
return scheduler;
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
index 5022a69..33a935e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
@@ -21,7 +21,10 @@
package org.onap.bbs.event.processor.config;
import static org.onap.bbs.event.processor.config.ApplicationConstants.STREAMS_TYPE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
+import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -31,10 +34,12 @@
import org.onap.bbs.event.processor.exceptions.ConfigurationParsingException;
import org.onap.bbs.event.processor.model.GeneratedAppConfigObject;
import org.onap.bbs.event.processor.utilities.LoggingUtil;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -48,9 +53,9 @@
private final SecurityProperties securityProperties;
private final GenericProperties genericProperties;
- private DmaapConsumerConfiguration dmaapReRegistrationConsumerConfiguration;
- private DmaapConsumerConfiguration dmaapCpeAuthenticationConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private MessageRouterSubscriberConfig dmaapReRegistrationConsumerConfiguration;
+ private MessageRouterSubscriberConfig dmaapCpeAuthenticationConsumerConfiguration;
+ private MessageRouterPublisherConfig dmaapPublisherConfiguration;
private AaiClientConfiguration aaiClientConfiguration;
private Set<ConfigurationChangeObserver> observers;
@@ -97,15 +102,15 @@
observers.forEach(ConfigurationChangeObserver::updateConfiguration);
}
- public synchronized DmaapConsumerConfiguration getDmaapReRegistrationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapReRegistrationConsumerConfiguration() {
return dmaapReRegistrationConsumerConfiguration;
}
- public synchronized DmaapConsumerConfiguration getDmaapCpeAuthenticationConsumerConfiguration() {
+ public synchronized MessageRouterSubscriberConfig getDmaapCpeAuthenticationConsumerConfiguration() {
return dmaapCpeAuthenticationConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized MessageRouterPublisherConfig getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
@@ -117,6 +122,18 @@
return genericProperties.getPipelinesPollingIntervalSec();
}
+ public synchronized DmaapProducerProperties getDmaapProducerProperties() {
+ return dmaapProducerProperties;
+ }
+
+ public synchronized DmaapReRegistrationConsumerProperties getDmaapReRegistrationConsumerProperties() {
+ return dmaapReRegistrationConsumerProperties;
+ }
+
+ public synchronized DmaapCpeAuthenticationConsumerProperties getDmaapCpeAuthenticationConsumerProperties() {
+ return dmaapCpeAuthenticationConsumerProperties;
+ }
+
public synchronized int getPipelinesTimeoutInSeconds() {
return genericProperties.getPipelinesTimeoutSec();
}
@@ -180,11 +197,17 @@
securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
- GeneratedAppConfigObject.StreamsObject reRegObject =
+ var reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
- TopicUrlInfo topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
+ var topicUrlInfo = parseTopicUrl(reRegObject.dmaapInfo().topicUrl());
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
@@ -196,9 +219,9 @@
dmaapReRegistrationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapReRegistrationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapReRegistrationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapReRegistrationConfiguration();
+ constructDmaapReRegistrationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject cpeAuthObject =
+ var cpeAuthObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.cpeAuthConfigKey(),
"CPE Authentication");
topicUrlInfo = parseTopicUrl(cpeAuthObject.dmaapInfo().topicUrl());
@@ -213,9 +236,9 @@
dmaapCpeAuthenticationConsumerProperties.setConsumerGroup(newConfiguration.dmaapConsumerConsumerGroup());
dmaapCpeAuthenticationConsumerProperties.setMessageLimit(newConfiguration.dmaapMessageLimit());
dmaapCpeAuthenticationConsumerProperties.setTimeoutMs(newConfiguration.dmaapTimeoutMs());
- constructDmaapCpeAuthenticationConfiguration();
+ constructDmaapCpeAuthenticationConfiguration(securityKeys);
- GeneratedAppConfigObject.StreamsObject closeLoopObject =
+ var closeLoopObject =
getStreamsObject(newConfiguration.streamPublishesMap(), newConfiguration.closeLoopConfigKey(),
"Close Loop");
topicUrlInfo = parseTopicUrl(closeLoopObject.dmaapInfo().topicUrl());
@@ -226,7 +249,7 @@
dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
- constructDmaapProducerConfiguration();
+ constructDmaapProducerConfiguration(securityKeys);
aaiClientProperties.setAaiHost(newConfiguration.aaiHost());
aaiClientProperties.setAaiPort(newConfiguration.aaiPort());
@@ -258,7 +281,7 @@
@NotNull
private GeneratedAppConfigObject.StreamsObject getStreamsObject(
Map<String, GeneratedAppConfigObject.StreamsObject> map, String configKey, String messageName) {
- GeneratedAppConfigObject.StreamsObject streamsObject = map.get(configKey);
+ var streamsObject = map.get(configKey);
if (!STREAMS_TYPE.equals(streamsObject.type())) {
throw new ApplicationEnvironmentException(String.format("%s requires information about"
+ " message-router topic in ONAP", messageName));
@@ -267,80 +290,45 @@
}
private void constructConfigurationObjects() {
- constructDmaapReRegistrationConfiguration();
- constructDmaapCpeAuthenticationConfiguration();
- constructDmaapProducerConfiguration();
+ final SecurityKeys securityKeysForReRegistration = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapReRegistrationConfiguration(securityKeysForReRegistration);
+ final SecurityKeys securityKeysForCpeAuthentication = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapCpeAuthenticationConfiguration(securityKeysForCpeAuthentication);
+ final SecurityKeys securityKeysForProducer = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(securityProperties.getKeyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(securityProperties.getKeyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(securityProperties.getTrustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(securityProperties.getTrustStorePasswordPath())))
+ .build();
+ constructDmaapProducerConfiguration(securityKeysForProducer);
constructAaiConfiguration();
}
- private void constructDmaapReRegistrationConfiguration() {
- dmaapReRegistrationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapReRegistrationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapReRegistrationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapReRegistrationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapReRegistrationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapReRegistrationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapReRegistrationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapReRegistrationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapReRegistrationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapReRegistrationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapReRegistrationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapReRegistrationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapReRegistrationConfiguration(final SecurityKeys securityKeys) {
+ dmaapReRegistrationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapCpeAuthenticationConfiguration() {
- dmaapCpeAuthenticationConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapHostName(dmaapCpeAuthenticationConsumerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapCpeAuthenticationConsumerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapCpeAuthenticationConsumerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapCpeAuthenticationConsumerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword() == null ? "" :
- dmaapCpeAuthenticationConsumerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapCpeAuthenticationConsumerProperties.getDmaapContentType())
- .consumerId(dmaapCpeAuthenticationConsumerProperties.getConsumerId())
- .consumerGroup(dmaapCpeAuthenticationConsumerProperties.getConsumerGroup())
- .timeoutMs(dmaapCpeAuthenticationConsumerProperties.getTimeoutMs())
- .messageLimit(dmaapCpeAuthenticationConsumerProperties.getMessageLimit())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapCpeAuthenticationConfiguration(final SecurityKeys securityKeys) {
+ dmaapCpeAuthenticationConsumerConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
- private void constructDmaapProducerConfiguration() {
- dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapHostName(dmaapProducerProperties.getDmaapHostName())
- .dmaapPortNumber(dmaapProducerProperties.getDmaapPortNumber())
- .dmaapProtocol(dmaapProducerProperties.getDmaapProtocol())
- .dmaapTopicName(dmaapProducerProperties.getDmaapTopicName())
- .dmaapUserName(
- dmaapProducerProperties.getDmaapUserName() == null ? "" :
- dmaapProducerProperties.getDmaapUserName())
- .dmaapUserPassword(
- dmaapProducerProperties.getDmaapUserPassword() == null ? "" :
- dmaapProducerProperties.getDmaapUserPassword())
- .dmaapContentType(dmaapProducerProperties.getDmaapContentType())
- .enableDmaapCertAuth(securityProperties.isEnableDmaapCertAuth())
- .keyStorePath(securityProperties.getKeyStorePath())
- .keyStorePasswordPath(securityProperties.getKeyStorePasswordPath())
- .trustStorePath(securityProperties.getTrustStorePath())
- .trustStorePasswordPath(securityProperties.getTrustStorePasswordPath())
+ private void constructDmaapProducerConfiguration(final SecurityKeys securityKeys) {
+ dmaapPublisherConfiguration = ImmutableMessageRouterPublisherConfig.builder()
+ .securityKeys(securityKeys)
.build();
}
@@ -362,19 +350,19 @@
}
private TopicUrlInfo parseTopicUrl(String topicUrl) {
- String[] urlTokens = topicUrl.split(":");
+ var urlTokens = topicUrl.split(":");
if (urlTokens.length != 3) {
throw new ConfigurationParsingException("Wrong topic URL format");
}
- TopicUrlInfo topicUrlInfo = new TopicUrlInfo();
+ var topicUrlInfo = new TopicUrlInfo();
topicUrlInfo.setHost(urlTokens[1].replace("/", ""));
- String[] tokensAfterHost = urlTokens[2].split("/events/");
+ var tokensAfterHost = urlTokens[2].split("/events/");
if (tokensAfterHost.length != 2) {
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName(tokensAfterHost[1]);
return topicUrlInfo;
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
index d2ac3c1..7be6bde 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConstants.java
@@ -35,5 +35,9 @@
// Close Loop Constants
public static final String DCAE_BBS_EVENT_PROCESSOR_MS_INSTANCE = "DCAE.BBS_event_processor_mSInstance";
+ // Subscribe for DMaaP
+ public static final String SUBSCRIBE_URL_TEMPLATE = "%s://%s:%d/events/%s";
+ public static final String PUBLISH_URL_TEMPLATE = "%s://%s:%d/events/%s";
+
private ApplicationConstants() {}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
index 607b3b3..4ac8549 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
@@ -40,8 +40,7 @@
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,9 +84,9 @@
}
boolean environmentNotReady() {
- String consulHost = System.getenv().get(CONSUL_HOST);
- String cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
- String hostname = System.getenv().get(HOSTNAME);
+ var consulHost = System.getenv().get(CONSUL_HOST);
+ var cbs = System.getenv().get(CONFIG_BINDING_SERVICE);
+ var hostname = System.getenv().get(HOSTNAME);
return consulHost == null || cbs == null || hostname == null;
}
@@ -106,7 +105,7 @@
private void parseConsulRetrievedConfiguration(JsonObject jsonObject) {
- GeneratedAppConfigObject generatedAppConfigObject = generateAppConfigObject(jsonObject);
+ var generatedAppConfigObject = generateAppConfigObject(jsonObject);
LOGGER.trace("Consul-Retrieved Application Generated Object:\n{}", generatedAppConfigObject);
configuration.updateCurrentConfiguration(generatedAppConfigObject);
}
@@ -124,10 +123,10 @@
RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
- EnvProperties env = EnvProperties.fromEnvironment();
- CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
+ var cbsClientConfig = CbsClientConfiguration.fromEnvironment();
+ var cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
- cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
+ cbsFetchPipeline = CbsClientFactory.createCbsClient(cbsClientConfig)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
.flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
@@ -138,57 +137,57 @@
GeneratedAppConfigObject generateAppConfigObject(JsonObject configObject) {
if (LOGGER.isInfoEnabled()) {
- String configAsString = gson.toJson(configObject);
+ var configAsString = gson.toJson(configObject);
LOGGER.info("Received App Config object\n{}", configAsString);
}
- final String dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
- final String dmaapContentType = configObject.get("dmaap.contentType").getAsString();
- final String dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
- final String dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
- final int dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
- final int dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
+ final var dmaapProtocol = configObject.get("dmaap.protocol").getAsString();
+ final var dmaapContentType = configObject.get("dmaap.contentType").getAsString();
+ final var dmaapConsumerId = configObject.get("dmaap.consumer.consumerId").getAsString();
+ final var dmaapConsumerGroup = configObject.get("dmaap.consumer.consumerGroup").getAsString();
+ final var dmaapMessageLimit = configObject.get("dmaap.messageLimit").getAsInt();
+ final var dmaapTimeoutMs = configObject.get("dmaap.timeoutMs").getAsInt();
- final String aaiHost = configObject.get("aai.host").getAsString();
- final int aaiPort = configObject.get("aai.port").getAsInt();
- final String aaiProtocol = configObject.get("aai.protocol").getAsString();
- final String aaiUsername = configObject.get("aai.username").getAsString();
- final String aaiPassword = configObject.get("aai.password").getAsString();
- final boolean aaiIgnoreSslCertificateErrors =
+ final var aaiHost = configObject.get("aai.host").getAsString();
+ final var aaiPort = configObject.get("aai.port").getAsInt();
+ final var aaiProtocol = configObject.get("aai.protocol").getAsString();
+ final var aaiUsername = configObject.get("aai.username").getAsString();
+ final var aaiPassword = configObject.get("aai.password").getAsString();
+ final var aaiIgnoreSslCertificateErrors =
configObject.get("aai.aaiIgnoreSslCertificateErrors").getAsBoolean();
- final int pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
- final int pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
- final int cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
+ final var pipelinesPollingIntervalSec = configObject.get("application.pipelinesPollingIntervalSec").getAsInt();
+ final var pipelinesTimeoutSec = configObject.get("application.pipelinesTimeoutSec").getAsInt();
+ final var cbsPollingIntervalSec = configObject.get("application.cbsPollingIntervalSec").getAsInt();
- final String reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
- final String reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
- final String cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
- final String cpeAuthClControlName =
+ final var reRegPolicyScope = configObject.get("application.reregistration.policyScope").getAsString();
+ final var reRegClControlName = configObject.get("application.reregistration.clControlName").getAsString();
+ final var cpeAuthPolicyScope = configObject.get("application.cpe.authentication.policyScope").getAsString();
+ final var cpeAuthClControlName =
configObject.get("application.cpe.authentication.clControlName").getAsString();
- final String policyVersion = configObject.get("application.policyVersion").getAsString();
- final String closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
- final String closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
- final String closeLoopVersion = configObject.get("application.clVersion").getAsString();
- final String closeLoopTarget = configObject.get("application.clTarget").getAsString();
- final String closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
+ final var policyVersion = configObject.get("application.policyVersion").getAsString();
+ final var closeLoopTargetType = configObject.get("application.clTargetType").getAsString();
+ final var closeLoopEventStatus = configObject.get("application.clEventStatus").getAsString();
+ final var closeLoopVersion = configObject.get("application.clVersion").getAsString();
+ final var closeLoopTarget = configObject.get("application.clTarget").getAsString();
+ final var closeLoopOriginator = configObject.get("application.clOriginator").getAsString();
- final String reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
- final String cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
- final String closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
+ final var reRegConfigKey = configObject.get("application.reregistration.configKey").getAsString();
+ final var cpeAuthConfigKey = configObject.get("application.cpeAuth.configKey").getAsString();
+ final var closeLoopConfigKey = configObject.get("application.closeLoop.configKey").getAsString();
- final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final var loggingLevel = configObject.get("application.loggingLevel").getAsString();
- final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
- final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
- final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
- final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
- final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
- final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+ final var keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final var keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final var trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final var trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final var aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final var dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
- final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
- final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
+ final var streamsPublishes = configObject.getAsJsonObject("streams_publishes");
+ final var streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
return ImmutableGeneratedAppConfigObject.builder()
.dmaapProtocol(dmaapProtocol)
@@ -259,22 +258,22 @@
private Map.Entry<String, GeneratedAppConfigObject.StreamsObject> parseStreamsSingleObject(
Map.Entry<String, JsonElement> jsonEntry) {
- JsonObject closeLoopOutput = (JsonObject) jsonEntry.getValue();
+ var closeLoopOutput = (JsonObject) jsonEntry.getValue();
- String type = closeLoopOutput.get("type").getAsString();
- String aafUsername = closeLoopOutput.get("aaf_username") != null
+ var type = closeLoopOutput.get("type").getAsString();
+ var aafUsername = closeLoopOutput.get("aaf_username") != null
? closeLoopOutput.get("aaf_username").getAsString() : "";
- String aafPassword = closeLoopOutput.get("aaf_password") != null
+ var aafPassword = closeLoopOutput.get("aaf_password") != null
? closeLoopOutput.get("aaf_password").getAsString() : "";
- JsonObject dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
- String clientId = dmaapInfo.get("client_id") != null
+ var dmaapInfo = closeLoopOutput.getAsJsonObject("dmaap_info");
+ var clientId = dmaapInfo.get("client_id") != null
? dmaapInfo.get("client_id").getAsString() : "";
- String clientRole = dmaapInfo.get("client_role") != null
+ var clientRole = dmaapInfo.get("client_role") != null
? dmaapInfo.get("client_role").getAsString() : "";
- String location = dmaapInfo.get("location") != null
+ var location = dmaapInfo.get("location") != null
? dmaapInfo.get("location").getAsString() : "";
- String topicUrl = dmaapInfo.get("topic_url").getAsString();
+ var topicUrl = dmaapInfo.get("topic_url").getAsString();
GeneratedAppConfigObject.DmaapInfo dmaapInfoObject = ImmutableDmaapInfo.builder()
.clientId(clientId)
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
new file mode 100644
index 0000000..b0ecfd5
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/MessageRouterConfig.java
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+*/
+
+package org.onap.bbs.event.processor.config;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MessageRouterConfig {
+
+ @Bean(name = "ReRegMessageRouterSubscriber")
+ public MessageRouterSubscriber reRegistrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapReRegistrationConsumerConfiguration());
+ }
+
+ @Bean(name = "CpeAuthMessageRouterSubscriber")
+ public MessageRouterSubscriber registrationMessageRouterSubscriber(ApplicationConfiguration configuration) {
+ return DmaapClientFactory
+ .createMessageRouterSubscriber(configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ }
+
+ @Bean
+ public MessageRouterPublisher mrPub(ApplicationConfiguration configuration) {
+ return DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
index 1be4f43..e2e4979 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/exceptions/EmptyDmaapResponseException.java
@@ -22,10 +22,6 @@
public class EmptyDmaapResponseException extends RuntimeException {
- public EmptyDmaapResponseException() {
- super();
- }
-
public EmptyDmaapResponseException(String message) {
super(message);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
index 8fa73e4..c952e9b 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ControlLoopPublisherDmaapModel.java
@@ -26,11 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ControlLoopPublisherDmaapModel extends DmaapModel {
+public interface ControlLoopPublisherDmaapModel {
@SerializedName(value = "closedLoopEventClient", alternate = "closedLoopEventClient")
String getClosedLoopEventClient();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
index 42c9896..50ed66e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/CpeAuthenticationConsumerDmaapModel.java
@@ -26,12 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface CpeAuthenticationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface CpeAuthenticationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
index 682c064..f64f30e 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/PnfAaiObject.java
@@ -26,11 +26,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true, emptyAsNulls = true)
-public interface PnfAaiObject extends ClientModel {
+public interface PnfAaiObject {
@SerializedName(value = "pnf-name", alternate = "pnf-name")
String getPnfName();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
index 07fb75a..3f58aa3 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/ReRegistrationConsumerDmaapModel.java
@@ -24,12 +24,10 @@
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@Value.Immutable
@Gson.TypeAdapters(fieldNamingStrategy = true)
-public interface ReRegistrationConsumerDmaapModel extends AaiModel, DmaapModel {
+public interface ReRegistrationConsumerDmaapModel {
@SerializedName(value = "correlationId", alternate = "correlationId")
String getCorrelationId();
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
index a30903b..2cab017 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
@@ -25,13 +25,11 @@
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -42,14 +40,13 @@
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.bbs.event.processor.model.MetadataListAaiObject;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -99,7 +96,7 @@
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
@@ -111,12 +108,12 @@
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("CPE Authentication event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("CPE Authentication event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("CPE Authentication event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -149,7 +146,7 @@
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setCpeAuthenticationEvent(event);
return state;
});
@@ -161,9 +158,9 @@
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- CpeAuthenticationConsumerDmaapModel vesEvent = state.getCpeAuthenticationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getCpeAuthenticationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -191,10 +188,10 @@
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -208,7 +205,7 @@
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -230,21 +227,13 @@
});
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
+ return Flux.empty();
}
- // At this point, we must check if the PNF RGW MAC address matches the value extracted from VES event
- if (!isCorrectMacAddress(state)) {
- LOGGER.warn("Processing stopped. RGW MAC address taken from event ({}) "
- + "does not match with A&AI metadata corresponding value",
- state.getCpeAuthenticationEvent().getRgwMacAddress());
- return Mono.empty();
- }
-
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -256,30 +245,15 @@
e -> Mono.empty());
}
-
- private boolean isCorrectMacAddress(PipelineState state) {
- // We need to check if the RGW MAC address received in VES event matches the one found in
- // HSIA CFS service (in its metadata section)
- Optional<MetadataListAaiObject> optionalMetadata = state.getHsiCfsServiceInstance()
- .getMetadataListAaiObject();
- String eventRgwMacAddress = state.getCpeAuthenticationEvent().getRgwMacAddress().orElse("");
- return optionalMetadata
- .map(list -> list.getMetadataEntries()
- .stream()
- .anyMatch(m -> "rgw-mac-address".equals(m.getMetaname())
- && m.getMetavalue().equals(eventRgwMacAddress)))
- .orElse(false);
- }
-
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
enrichmentData.put("cpe.old-authentication-state", state.cpeAuthenticationEvent.getOldAuthenticationState());
enrichmentData.put("cpe.new-authentication-state", state.cpeAuthenticationEvent.getNewAuthenticationState());
- String swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
+ var swVersion = state.getCpeAuthenticationEvent().getSwVersion().orElse("");
if (!StringUtils.isEmpty(swVersion)) {
enrichmentData.put("cpe.swVersion", swVersion);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
index 33a9aea..0bc5e35 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
@@ -25,12 +25,10 @@
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -48,7 +46,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -98,7 +96,7 @@
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<HttpResponse> executePipeline() {
+ Flux<MessageRouterPublishResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
@@ -110,12 +108,12 @@
.flatMap(this::triggerPolicy);
}
- private void onSuccess(HttpResponse responseCode) {
- MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
- LOGGER.info("PNF Re-Registration event successfully handled. "
- + "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.statusCode(), responseCode.statusReason());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ LOGGER.info("PNF Re-Registration event successfully handled. Published Policy event to DMaaP");
+ } else {
+ LOGGER.error("PNF Re-Registration event handling error [{}]", response.failReason());
+ }
}
private void onError(Throwable throwable) {
@@ -148,7 +146,7 @@
.map(event -> {
// For each message, we have to keep separate state. This state will be enhanced
// in each step and handed off to the next processing step
- PipelineState state = new PipelineState();
+ var state = new PipelineState();
state.setReRegistrationEvent(event);
return state;
});
@@ -160,9 +158,9 @@
private Mono<PipelineState> fetchPnfFromAai(PipelineState state) {
- ReRegistrationConsumerDmaapModel vesEvent = state.getReRegistrationEvent();
- String pnfName = vesEvent.getCorrelationId();
- String url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
+ var vesEvent = state.getReRegistrationEvent();
+ var pnfName = vesEvent.getCorrelationId();
+ var url = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName);
LOGGER.debug("Processing Step: Retrieve PNF. Url: ({})", url);
return aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, url)
@@ -196,10 +194,10 @@
return Mono.empty();
}
- PnfAaiObject pnf = state.getPnfAaiObject();
+ var pnf = state.getPnfAaiObject();
// Assuming that the PNF will only have a single service-instance relationship pointing
// towards the HSI CFS service
- String serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var serviceInstanceId = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> "service-instance".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -213,7 +211,7 @@
return Mono.empty();
}
- String url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var url = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
serviceInstanceId);
LOGGER.debug("Processing Step: Retrieve HSI CFS Service. Url: ({})", url);
return aaiClientTask.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, url)
@@ -236,8 +234,7 @@
}
private boolean isNotReallyAnOntRelocation(PipelineState state) {
- List<RelationshipListAaiObject.RelationshipEntryAaiObject> relationshipEntries =
- state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
+ var relationshipEntries = state.getPnfAaiObject().getRelationshipListAaiObject().getRelationshipEntries();
// If no logical-link, fail further processing
if (relationshipEntries.stream().noneMatch(e -> "logical-link".equals(e.getRelatedTo()))) {
@@ -247,7 +244,7 @@
}
// Assuming PNF will only have one logical-link per BBS use case design
- boolean isNotRelocation = relationshipEntries
+ var isNotRelocation = relationshipEntries
.stream()
.filter(e -> "logical-link".equals(e.getRelatedTo()))
.flatMap(e -> e.getRelationshipData().stream())
@@ -263,13 +260,13 @@
return isNotRelocation;
}
- private Mono<HttpResponse> triggerPolicy(PipelineState state) {
+ private Flux<MessageRouterPublishResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
- return Mono.empty();
+ return Flux.empty();
}
- ControlLoopPublisherDmaapModel event = buildTriggeringPolicyEvent(state);
+ var event = buildTriggeringPolicyEvent(state);
return publisherTask.execute(event)
.timeout(Duration.ofSeconds(configuration.getPipelinesTimeoutInSeconds()))
.doOnError(TimeoutException.class,
@@ -283,12 +280,12 @@
private ControlLoopPublisherDmaapModel buildTriggeringPolicyEvent(PipelineState state) {
- String cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
+ var cfsServiceInstanceId = state.getHsiCfsServiceInstance().getServiceInstanceId();
- String attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
- String remoteId = state.getReRegistrationEvent().getRemoteId();
- String cvlan = state.getReRegistrationEvent().getCVlan();
- String svlan = state.getReRegistrationEvent().getSVlan();
+ var attachmentPoint = state.getReRegistrationEvent().getAttachmentPoint();
+ var remoteId = state.getReRegistrationEvent().getRemoteId();
+ var cvlan = state.getReRegistrationEvent().getCVlan();
+ var svlan = state.getReRegistrationEvent().getSVlan();
Map<String, String> enrichmentData = new HashMap<>();
enrichmentData.put("service-information.hsia-cfs-service-instance-id", cfsServiceInstanceId);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
index 5fbb087..882635c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/Scheduler.java
@@ -96,7 +96,7 @@
LOGGER.info("BBS event processing pipelines will start in {} seconds "
+ "and will run periodically every {} seconds", PIPELINES_INITIAL_DELAY_IN_SECONDS,
currentPipelinesPollingInterval);
- Instant desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
+ var desiredStartTime = Instant.now().plusSeconds(PIPELINES_INITIAL_DELAY_IN_SECONDS);
scheduleProcessingTasks(desiredStartTime, currentPipelinesPollingInterval);
// Register for configuration changes
@@ -118,7 +118,7 @@
cancelScheduledProcessingTasks();
reScheduleProcessingTasks();
}
- int newCbsPollingInterval = configuration.getCbsPollingInterval();
+ var newCbsPollingInterval = configuration.getCbsPollingInterval();
if (newCbsPollingInterval != currentCbsPollingInterval) {
if (newCbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL) {
LOGGER.warn("CBS Polling interval is too small ({}). Will not re-schedule CBS job",
@@ -222,9 +222,9 @@
}
private int validatePipelinesPollingInterval() {
- int pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
- boolean isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
+ var pipelinesPollingInterval = configuration.getPipelinesPollingIntervalInSeconds();
+ var isSmallInterval = pipelinesPollingInterval < DEFAULT_PIPELINES_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_PIPELINES_POLLING_INTERVAL : pipelinesPollingInterval;
if (isSmallInterval) {
LOGGER.warn("Pipelines Polling interval is too small ({}). Defaulting to {}", pipelinesPollingInterval,
DEFAULT_PIPELINES_POLLING_INTERVAL);
@@ -233,9 +233,9 @@
}
private int verifyCbsPollingInterval() {
- int cbsPollingInterval = configuration.getCbsPollingInterval();
- boolean isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
- int verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
+ var cbsPollingInterval = configuration.getCbsPollingInterval();
+ var isSmallInterval = cbsPollingInterval < DEFAULT_CBS_POLLING_INTERVAL;
+ var verifiedInterval = isSmallInterval ? DEFAULT_CBS_POLLING_INTERVAL : cbsPollingInterval;
if (isSmallInterval) {
LOGGER.warn("CBS Polling interval is too small ({}). Defaulting to {}", cbsPollingInterval,
DEFAULT_CBS_POLLING_INTERVAL);
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
index da51028..153cb91 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
@@ -20,59 +20,61 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
-public class DmaapCpeAuthenticationConsumerTaskImpl
- implements DmaapCpeAuthenticationConsumerTask, ConfigurationChangeObserver {
+public class DmaapCpeAuthenticationConsumerTaskImpl implements DmaapCpeAuthenticationConsumerTask,
+ ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapCpeAuthenticationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final CpeAuthenticationDmaapConsumerJsonParser cpeAuthenticationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
- new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
-
- private DMaaPConsumerReactiveHttpClient httpClient;
+ new EmptyDmaapResponseException("CPE Authentication: Got an empty response from DMaaP");
@Autowired
- public DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new CpeAuthenticationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapCpeAuthenticationConsumerTaskImpl(ApplicationConfiguration configuration,
- CpeAuthenticationDmaapConsumerJsonParser
- cpeAuthenticationDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) throws SSLException {
+ @Qualifier("CpeAuthMessageRouterSubscriber")
+ MessageRouterSubscriber subscriber,
+ CpeAuthenticationDmaapConsumerJsonParser parser) {
+ this.cpeAuthenticationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.cpeAuthenticationDmaapConsumerJsonParser = cpeAuthenticationDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +89,25 @@
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId());
}
@Override
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest) // subscriber.get(subscribeRequest)
+ .flatMap(jsonElement ->
+ cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +115,4 @@
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
index 749c4e5..dec1dbc 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
@@ -21,11 +21,11 @@
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
public interface DmaapPublisherTask {
- Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
index 283e5ef..6c50b10 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,48 @@
package org.onap.bbs.event.processor.tasks;
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.PUBLISH_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createPublishRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask, ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private ApplicationConfiguration configuration;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- private DMaaPPublisherReactiveHttpClient httpClient;
+ private ApplicationConfiguration configuration;
+ private MessageRouterPublisher publisher;
+ private String publishUrl;
+ private MessageRouterPublishRequest publishRequest;
@Autowired
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration) {
- this(configuration, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),
- new ControlLoopJsonBodyBuilder()));
- }
-
- DmaapPublisherTaskImpl(ApplicationConfiguration configuration,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ DmaapPublisherTaskImpl(ApplicationConfiguration configuration, MessageRouterPublisher publisher) {
this.configuration = configuration;
- this.httpClientFactory = httpClientFactory;
-
- try {
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ this.publisher = publisher;
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ this.configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapProducerProperties().getDmaapHostName(),
+ this.configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@PostConstruct
@@ -83,27 +77,23 @@
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- try {
- LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
- LOGGER.debug("SSL exception\n", e);
- }
+ publisher =
+ DmaapClientFactory.createMessageRouterPublisher(configuration.getDmaapPublisherConfiguration());
+ publishUrl = String.format(PUBLISH_URL_TEMPLATE,
+ configuration.getDmaapProducerProperties().getDmaapProtocol(),
+ configuration.getDmaapProducerProperties().getDmaapHostName(),
+ configuration.getDmaapProducerProperties().getDmaapPortNumber(),
+ configuration.getDmaapProducerProperties().getDmaapTopicName());
+ publishRequest = createPublishRequest(publishUrl);
}
@Override
- public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
- if (controlLoopPublisherDmaapModel == null) {
+ public Flux<MessageRouterPublishResponse> execute(ControlLoopPublisherDmaapModel event) {
+ if (event == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
LOGGER.info("Executing task for publishing control loop message");
- LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
- DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- }
-
- private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
- return httpClient;
+ LOGGER.debug("CL message \n{}", event);
+ return publisher.put(publishRequest, Flux.just(ControlLoopJsonBodyBuilder.createAsJsonElement(event)));
}
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
index e40037b..aff563c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
@@ -20,25 +20,23 @@
package org.onap.bbs.event.processor.tasks;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
+import static org.onap.bbs.event.processor.config.ApplicationConstants.SUBSCRIBE_URL_TEMPLATE;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.createSubscribeRequest;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@@ -49,30 +47,33 @@
ConfigurationChangeObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapReRegistrationConsumerTaskImpl.class);
- private ApplicationConfiguration configuration;
+
private final ReRegistrationDmaapConsumerJsonParser reRegistrationDmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
+ private ApplicationConfiguration configuration;
+ private MessageRouterSubscriber subscriber;
+ private String subscribeUrl;
+ private MessageRouterSubscribeRequest subscribeRequest;
private static final EmptyDmaapResponseException EMPTY_DMAAP_EXCEPTION =
new EmptyDmaapResponseException("PNF Re-Registration: Got an empty response from DMaaP");
- private DMaaPConsumerReactiveHttpClient httpClient;
-
@Autowired
- public DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration) throws SSLException {
- this(configuration, new ReRegistrationDmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
-
DmaapReRegistrationConsumerTaskImpl(ApplicationConfiguration configuration,
- ReRegistrationDmaapConsumerJsonParser reRegDmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory)
- throws SSLException {
+ @Qualifier("ReRegMessageRouterSubscriber") MessageRouterSubscriber subscriber,
+ ReRegistrationDmaapConsumerJsonParser parser) {
+ this.reRegistrationDmaapConsumerJsonParser = parser;
this.configuration = configuration;
- this.reRegistrationDmaapConsumerJsonParser = reRegDmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
+ this.subscriber = subscriber;
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@PostConstruct
@@ -87,24 +88,25 @@
@Override
public synchronized void updateConfiguration() {
- try {
- LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
- LOGGER.info("Creating secure context with:\n {}",
- this.configuration.getDmaapReRegistrationConsumerConfiguration());
- httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
- } catch (SSLException e) {
- LOGGER.error("SSL error while updating HTTP Client after a config update");
- LOGGER.debug("SSL exception\n", e);
- }
+ LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ subscribeUrl = String.format(SUBSCRIBE_URL_TEMPLATE,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber(),
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName());
+ subscribeRequest = createSubscribeRequest(
+ subscribeUrl,
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup(),
+ this.configuration.getDmaapReRegistrationConsumerProperties().getConsumerId());
}
@Override
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
- DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
- return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
- .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
+ return subscriber.getElements(subscribeRequest)
+ .flatMap(jsonElement ->
+ reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(Mono.just(jsonElement)))
+ .switchIfEmpty(Mono.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
if (!(e instanceof EmptyDmaapResponseException)) {
LOGGER.error("DMaaP Consumption Exception: {}", e.getMessage());
@@ -112,8 +114,4 @@
}
});
}
-
- private synchronized DMaaPConsumerReactiveHttpClient getHttpClient() {
- return httpClient;
- }
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
index 84fc9f7..c0e9b1c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
@@ -20,6 +20,8 @@
package org.onap.bbs.event.processor.utilities;
+import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource;
+import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath;
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
import com.google.gson.Gson;
@@ -27,9 +29,10 @@
import io.netty.handler.ssl.SslContext;
+import java.nio.file.Paths;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.AaiClientConfiguration;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
@@ -37,7 +40,9 @@
import org.onap.bbs.event.processor.exceptions.AaiTaskException;
import org.onap.bbs.event.processor.model.PnfAaiObject;
import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
-import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,7 +69,7 @@
private AaiClientConfiguration aaiClientConfiguration;
@Autowired
- AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException {
+ AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) {
this.configuration = configuration;
this.gson = gson;
this.sslFactory = new SslFactory();
@@ -85,25 +90,20 @@
@Override
public void updateConfiguration() {
- AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration();
+ var newConfiguration = configuration.getAaiClientConfiguration();
if (aaiClientConfiguration.equals(newConfiguration)) {
LOGGER.info("No Configuration changes necessary for AAI Reactive client");
} else {
synchronized (this) {
LOGGER.info("AAI Reactive client must be re-configured");
aaiClientConfiguration = newConfiguration;
- try {
- setupWebClient();
- } catch (SSLException e) {
- LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient");
- LOGGER.debug("SSL Exception\n", e);
- }
+ setupWebClient();
}
}
}
- private void setupWebClient() throws SSLException {
- SslContext sslContext = createSslContext();
+ private void setupWebClient() {
+ var sslContext = createSslContext();
ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
@@ -132,7 +132,7 @@
private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) {
LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName());
- WebClient webClient = getWebClient();
+ var webClient = getWebClient();
return webClient
.get()
.uri(url)
@@ -179,17 +179,18 @@
});
}
- private SslContext createSslContext() throws SSLException {
+ private SslContext createSslContext() {
if (aaiClientConfiguration.enableAaiCertAuth()) {
LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
- return sslFactory.createSecureContext(
- aaiClientConfiguration.keyStorePath(),
- aaiClientConfiguration.keyStorePasswordPath(),
- aaiClientConfiguration.trustStorePath(),
- aaiClientConfiguration.trustStorePasswordPath()
- );
+ final SecurityKeys securityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath()))
+ .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath())))
+ .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath()))
+ .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath())))
+ .build();
+ return sslFactory.createSecureClientContext(securityKeys);
}
- return sslFactory.createInsecureContext();
+ return sslFactory.createInsecureClientContext();
}
private synchronized WebClient getWebClient() {
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
index c2b6734..b3fa09d 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.utilities;
-public class CommonEventFields {
+class CommonEventFields {
static final String COMMON_FORMAT = "\": \"%s\"";
static final String EVENT = "event";
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
index 03ea093..f27cca5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java
@@ -21,24 +21,23 @@
package org.onap.bbs.event.processor.utilities;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
import com.google.gson.TypeAdapterFactory;
import java.util.ServiceLoader;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> {
+public class ControlLoopJsonBodyBuilder {
/**
* Serialize the Control Loop DMaaP model with GSON.
* @param publisherDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder()
.closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
@@ -56,4 +55,30 @@
.originator(publisherDmaapModel.getOriginator())
.build());
}
+
+ /**
+ * Serialize the Control Loop DMaaP model with GSON.
+ * @param publisherDmaapModel object to be serialized
+ * @return String output of serialization
+ */
+ public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) {
+ var gsonBuilder = new GsonBuilder().disableHtmlEscaping();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder()
+ .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient())
+ .policyVersion(publisherDmaapModel.getPolicyVersion())
+ .policyName(publisherDmaapModel.getPolicyName())
+ .policyScope(publisherDmaapModel.getPolicyScope())
+ .targetType(publisherDmaapModel.getTargetType())
+ .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData())
+ .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart())
+ .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus())
+ .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName())
+ .version(publisherDmaapModel.getVersion())
+ .target(publisherDmaapModel.getTarget())
+ .requestId(publisherDmaapModel.getRequestId())
+ .originator(publisherDmaapModel.getOriginator())
+ .build());
+ }
+
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
index 3cff4e6..5a0466a 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
@@ -46,11 +46,13 @@
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class CpeAuthenticationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class);
@@ -108,9 +110,9 @@
return Mono.empty();
}
- JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
- JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
+ var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT)
.getAsJsonObject(STATE_CHANGE_FIELDS);
pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
@@ -120,7 +122,7 @@
stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE);
if (stateChangeFields.has(ADDITIONAL_FIELDS)) {
- JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
+ var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS);
rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS);
swVersion = getValueFromJson(additionalFields, SW_VERSION);
}
@@ -128,7 +130,7 @@
if (StringUtils.isEmpty(pnfSourceName)
|| authenticationStatusMissing(oldAuthenticationStatus)
|| authenticationStatusMissing(newAuthenticationStatus)) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -164,7 +166,7 @@
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
index 09aa730..df8eaa4 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java
@@ -27,18 +27,16 @@
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> {
+public class CpeAuthenticationJsonBodyBuilder {
/**
* Serialize the CPE authentication DMaaP model with GSON.
* @param cpeAuthenticationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId())
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
new file mode 100644
index 0000000..be8bea0
--- /dev/null
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.bbs.event.processor.utilities;
+
+import java.nio.file.Paths;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+
+public class GenericUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class);
+
+ private GenericUtils() {}
+
+ /**
+ * Creates Message Router subscription request.
+ * @param topicUrl URL of topic to use
+ * @param consumerGroup Consumer Group for subscription
+ * @param consumerId Consumer ID for subscription
+ * @return request based on provided input
+ */
+ public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl,
+ String consumerGroup, String consumerId) {
+ var sourceDefinition = ImmutableMessageRouterSource.builder()
+ .name("Subscriber source")
+ .topicUrl(topicUrl)
+ .build();
+
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ /**
+ * Creates Message Router publish request.
+ * @param topicUrl URL of topic to use
+ * @return request based on provided input
+ */
+ public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+ .topicUrl(topicUrl)
+ .name("Producer sink")
+ .build();
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition)
+ .contentType(ContentType.APPLICATION_JSON)
+ .build();
+ }
+
+ /**
+ * Creates a security key store for HTTPS.
+ * @param resource identifying the resource from which we will read security information
+ * @return store that will be used for HTTPS
+ */
+ @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) {
+ if (StringUtils.isEmpty(resource)) {
+ throw new ApplicationEnvironmentException("Resource for security key store is empty");
+ }
+ var path = Paths.get(resource);
+ LOGGER.info("Reading keys from {}", path.toString());
+ return SecurityKeysStore.fromPath(path);
+ }
+}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 9fe0c27..5e249e5 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -41,11 +41,13 @@
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+@Component
public class ReRegistrationDmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class);
@@ -101,7 +103,7 @@
return Mono.empty();
}
- JsonObject pnfReRegistrationFields =
+ var pnfReRegistrationFields =
dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS);
pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID);
@@ -112,7 +114,7 @@
svlan = getValueFromJson(pnfReRegistrationFields, SVLAN);
if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) {
- String incorrectEvent = dumpJsonData();
+ var incorrectEvent = dumpJsonData();
LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent);
return Mono.empty();
}
@@ -148,7 +150,7 @@
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- JsonParser jsonParser = new JsonParser();
+ var jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
index 867cfda..bf90a4c 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java
@@ -27,18 +27,16 @@
import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
-public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> {
+public class ReRegistrationJsonBodyBuilder {
/**
* Serialize the Re-Registration DMaaP model with GSON.
* @param reRegistrationConsumerDmaapModel object to be serialized
* @return String output of serialization
*/
- @Override
public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(reRegistrationConsumerDmaapModel.getCorrelationId())
diff --git a/components/bbs-event-processor/src/main/resources/application.yml b/components/bbs-event-processor/src/main/resources/application.yml
index 9092ada..ed99643 100644
--- a/components/bbs-event-processor/src/main/resources/application.yml
+++ b/components/bbs-event-processor/src/main/resources/application.yml
@@ -9,7 +9,7 @@
re-registration:
dmaapHostName: localhost
dmaapPortNumber: 2222
- dmaapTopicName: /events/unauthenticated.PNF_Update
+ dmaapTopicName: unauthenticated.PNF_Update
dmaapProtocol: http
dmaapContentType: application/json
consumerId: c12
@@ -19,7 +19,7 @@
cpe-authentication:
dmaapHostName: localhost
dmaapPortNumber: 2222
- dmaapTopicName: /events/unauthenticated.CPE_Authentication
+ dmaapTopicName: unauthenticated.CPE_Authentication
dmaapProtocol: http
dmaapContentType: application/json
consumerId: c12
@@ -29,7 +29,7 @@
producer:
dmaapHostName: localhost
dmaapPortNumber: 2223
- dmaapTopicName: /events/unauthenticated.DCAE_CL_OUTPUT
+ dmaapTopicName: unauthenticated.DCAE_CL_OUTPUT
dmaapProtocol: http
dmaapContentType: application/json
aai:
@@ -47,14 +47,14 @@
Real-Time: true
Content-Type: application/json
security:
- trustStorePath: change it
- trustStorePasswordPath: change it
- keyStorePath: change it
- keyStorePasswordPath: change it
+ trustStorePath: /opt/app/bbs-event-processor/etc/cert/trust.jks
+ trustStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/trust.pass
+ keyStorePath: /opt/app/bbs-event-processor/etc/cert/cert.jks
+ keyStorePasswordPath: /opt/app/bbs-event-processor/etc/cert/jks.pass
enableAaiCertAuth: false
enableDmaapCertAuth: false
application:
- pipelinesPollingIntervalSec: 30
+ pipelinesPollingIntervalSec: 25
pipelinesTimeoutSec: 15
policyVersion: 1.0.0.5
clTargetType: VM
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
index 69fbb3f..2d9e49f 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
@@ -37,8 +37,6 @@
import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.SpringBootTest;
@@ -66,6 +64,8 @@
"configs.aai.client.aaiHeaders.Content-Type=application/merge-patch+json",
"configs.dmaap.consumer.re-registration.dmaapHostName=test localhost",
"configs.dmaap.consumer.re-registration.dmaapPortNumber=1234",
+ "configs.dmaap.consumer.re-registration.dmaapUserName=",
+ "configs.dmaap.consumer.re-registration.dmaapUserPassword=",
"configs.dmaap.consumer.re-registration.dmaapTopicName=/events/unauthenticated.PNF_REREGISTRATION",
"configs.dmaap.consumer.re-registration.dmaapProtocol=http",
"configs.dmaap.consumer.re-registration.dmaapContentType=application/json",
@@ -75,6 +75,8 @@
"configs.dmaap.consumer.re-registration.messageLimit=1",
"configs.dmaap.consumer.cpe-authentication.dmaapHostName=test localhost",
"configs.dmaap.consumer.cpe-authentication.dmaapPortNumber=1234",
+ "configs.dmaap.consumer.cpe-authentication.dmaapUserName=",
+ "configs.dmaap.consumer.cpe-authentication.dmaapUserPassword=",
"configs.dmaap.consumer.cpe-authentication.dmaapTopicName=/events/unauthenticated.CPE_AUTHENTICATION",
"configs.dmaap.consumer.cpe-authentication.dmaapProtocol=http",
"configs.dmaap.consumer.cpe-authentication.dmaapContentType=application/json",
@@ -84,13 +86,15 @@
"configs.dmaap.consumer.cpe-authentication.messageLimit=1",
"configs.dmaap.producer.dmaapHostName=test localhost",
"configs.dmaap.producer.dmaapPortNumber=1234",
+ "configs.dmaap.producer.dmaapUserName=",
+ "configs.dmaap.producer.dmaapUserPassword=",
"configs.dmaap.producer.dmaapTopicName=/events/unauthenticated.DCAE_CL_OUTPUT",
"configs.dmaap.producer.dmaapProtocol=http",
"configs.dmaap.producer.dmaapContentType=application/json",
- "configs.security.trustStorePath=test trust store path",
- "configs.security.trustStorePasswordPath=test trust store password path",
- "configs.security.keyStorePath=test key store path",
- "configs.security.keyStorePasswordPath=test key store password path",
+ "configs.security.trustStorePath=KeyStore.jks",
+ "configs.security.trustStorePasswordPath=KeyStorePass.txt",
+ "configs.security.keyStorePath=KeyStore.jks",
+ "configs.security.keyStorePasswordPath=KeyStorePass.txt",
"configs.security.enableDmaapCertAuth=false",
"configs.security.enableAaiCertAuth=false",
"configs.application.pipelinesPollingIntervalSec=30",
@@ -132,7 +136,7 @@
@Test
void testA_configurationObjectSuccessfullyPopulated() {
- AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration();
+ var aaiClientConfiguration = configuration.getAaiClientConfiguration();
assertAll("AAI Client Configuration Properties",
() -> assertEquals("test localhost", aaiClientConfiguration.aaiHost()),
() -> assertEquals(Integer.valueOf(1234), aaiClientConfiguration.aaiPort()),
@@ -148,50 +152,64 @@
aaiClientConfiguration.aaiHeaders().get("Content-Type"))
);
- DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig =
- configuration.getDmaapReRegistrationConsumerConfiguration();
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
- () -> assertEquals("test localhost", dmaapConsumerReRegistrationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(1234,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.PNF_REREGISTRATION",
- dmaapConsumerReRegistrationConfig.dmaapTopicName()),
- () -> assertEquals("http", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
- () -> assertEquals("c12", dmaapConsumerReRegistrationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c12", dmaapConsumerReRegistrationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(-1), dmaapConsumerReRegistrationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(1), dmaapConsumerReRegistrationConfig.messageLimit())
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("http",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c12",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c12",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(-1, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(1, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit())
);
- DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig =
- configuration.getDmaapCpeAuthenticationConsumerConfiguration();
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
- () -> assertEquals("test localhost", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(1234,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION",
- dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
- () -> assertEquals("http", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
- () -> assertEquals("c12", dmaapConsumerCpeAuthenticationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c12", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(-1), dmaapConsumerCpeAuthenticationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(1), dmaapConsumerCpeAuthenticationConfig.messageLimit())
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("http",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c12",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c12",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(-1, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(1,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit())
);
- DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration();
assertAll("DMaaP Publisher Configuration Properties",
- () -> assertEquals("test localhost", dmaapPublisherConfiguration.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(1234), dmaapPublisherConfiguration.dmaapPortNumber()),
+ () -> assertEquals("test localhost",
+ configuration.getDmaapProducerProperties().getDmaapHostName()),
+ () -> assertEquals(1234, configuration.getDmaapProducerProperties().getDmaapPortNumber()),
() -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT",
- dmaapPublisherConfiguration.dmaapTopicName()),
- () -> assertEquals("http", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
+ configuration.getDmaapProducerProperties().getDmaapTopicName()),
+ () -> assertEquals("http", configuration.getDmaapProducerProperties().getDmaapProtocol()),
+ () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserName()),
+ () -> assertEquals("", configuration.getDmaapProducerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapProducerProperties().getDmaapContentType())
);
assertAll("Generic Application Properties",
@@ -211,12 +229,11 @@
assertAll("Security Application Properties",
() -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()),
- () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
- () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()),
- () -> assertEquals("test key store password path",
+ () -> assertEquals("KeyStore.jks", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("KeyStorePass.txt",
aaiClientConfiguration.keyStorePasswordPath()),
- () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()),
- () -> assertEquals("test trust store password path",
+ () -> assertEquals("KeyStore.jks", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("KeyStorePass.txt",
aaiClientConfiguration.trustStorePasswordPath())
);
}
@@ -298,10 +315,10 @@
.cpeAuthConfigKey("config_key_2")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
- .keyStorePath("test key store path - update")
- .keyStorePasswordPath("test key store password path - update")
- .trustStorePath("test trust store path - update")
- .trustStorePasswordPath("test trust store password path - update")
+ .keyStorePath("KeyStore-update.jks")
+ .keyStorePasswordPath("KeyStorePass-update.txt")
+ .trustStorePath("KeyStore-update.jks")
+ .trustStorePasswordPath("KeyStorePass-update.txt")
.enableAaiCertAuth(true)
.enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
@@ -311,7 +328,7 @@
// Update the configuration
configuration.updateCurrentConfiguration(updatedConfiguration);
- AaiClientConfiguration aaiClientConfiguration = configuration.getAaiClientConfiguration();
+ var aaiClientConfiguration = configuration.getAaiClientConfiguration();
assertAll("AAI Client Configuration Properties",
() -> assertEquals("aai.onap.svc.cluster.local", aaiClientConfiguration.aaiHost()),
() -> assertEquals(Integer.valueOf(8443), aaiClientConfiguration.aaiPort()),
@@ -327,50 +344,65 @@
aaiClientConfiguration.aaiHeaders().get("Content-Type"))
);
- DmaapConsumerConfiguration dmaapConsumerReRegistrationConfig =
- configuration.getDmaapReRegistrationConsumerConfiguration();
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
- () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.PNF_UPDATE",
- dmaapConsumerReRegistrationConfig.dmaapTopicName()),
- () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
- () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(5), dmaapConsumerReRegistrationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(10), dmaapConsumerReRegistrationConfig.messageLimit())
+ () -> assertEquals("we-are-message-router1.us",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(3901,
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.PNF_UPDATE",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("https",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapReRegistrationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c13",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c13",
+ configuration.getDmaapReRegistrationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(5, configuration.getDmaapReRegistrationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(10, configuration.getDmaapReRegistrationConsumerProperties().getMessageLimit())
);
- DmaapConsumerConfiguration dmaapConsumerCpeAuthenticationConfig =
- configuration.getDmaapCpeAuthenticationConsumerConfiguration();
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
- () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION",
- dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
- () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
- () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()),
- () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
- () -> assertEquals(Integer.valueOf(5), dmaapConsumerCpeAuthenticationConfig.timeoutMs()),
- () -> assertEquals(Integer.valueOf(10), dmaapConsumerCpeAuthenticationConfig.messageLimit())
+ () -> assertEquals("we-are-message-router2.us",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapHostName()),
+ () -> assertEquals(3902,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.CPE_AUTHENTICATION",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapTopicName()),
+ () -> assertEquals("https",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getDmaapContentType()),
+ () -> assertEquals("c13",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerId()),
+ () -> assertEquals("OpenDcae-c13",
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getConsumerGroup()),
+ () -> assertEquals(5, configuration.getDmaapCpeAuthenticationConsumerProperties().getTimeoutMs()),
+ () -> assertEquals(10,
+ configuration.getDmaapCpeAuthenticationConsumerProperties().getMessageLimit())
);
- DmaapPublisherConfiguration dmaapPublisherConfiguration = configuration.getDmaapPublisherConfiguration();
assertAll("DMaaP Publisher Configuration Properties",
- () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()),
- () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()),
- () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT",
- dmaapPublisherConfiguration.dmaapTopicName()),
- () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()),
- () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
+ () -> assertEquals("we-are-message-router3.us",
+ configuration.getDmaapProducerProperties().getDmaapHostName()),
+ () -> assertEquals(3903, configuration.getDmaapProducerProperties().getDmaapPortNumber()),
+ () -> assertEquals("unauthenticated.DCAE_CL_OUTPUT",
+ configuration.getDmaapProducerProperties().getDmaapTopicName()),
+ () -> assertEquals("https", configuration.getDmaapProducerProperties().getDmaapProtocol()),
+ () -> assertEquals("some-user", configuration.getDmaapProducerProperties().getDmaapUserName()),
+ () -> assertEquals("some-password",
+ configuration.getDmaapProducerProperties().getDmaapUserPassword()),
+ () -> assertEquals("application/json",
+ configuration.getDmaapProducerProperties().getDmaapContentType())
);
assertAll("Generic Application Properties",
@@ -391,12 +423,11 @@
assertAll("Security Application Properties",
() -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()),
- () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
- () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()),
- () -> assertEquals("test key store password path - update",
+ () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("KeyStorePass-update.txt",
aaiClientConfiguration.keyStorePasswordPath()),
- () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()),
- () -> assertEquals("test trust store password path - update",
+ () -> assertEquals("KeyStore-update.jks", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("KeyStorePass-update.txt",
aaiClientConfiguration.trustStorePasswordPath())
);
}
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
index 1acf864..1d1bce2 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
@@ -48,13 +48,13 @@
}
ConsulConfigurationGatewayTest() {
- ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class);
+ var configuration = Mockito.mock(ApplicationConfiguration.class);
this.configurationGateway = new ConsulConfigurationGateway(configuration);
}
@Test
void passingValidJson_constructsGeneratedAppConfigObject() {
- final String validJson = "{"
+ final var validJson = "{"
+ "\"dmaap.protocol\": \"http\","
+ "\"dmaap.contentType\": \"application/json\","
+ "\"dmaap.consumer.consumerId\": \"c12\","
@@ -219,7 +219,7 @@
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
- ConsulConfigurationGateway spiedGateway = Mockito.spy(configurationGateway);
+ var spiedGateway = Mockito.spy(configurationGateway);
doReturn(false).when(spiedGateway).environmentNotReady();
assertEquals(expectedConfiguration,
spiedGateway.generateAppConfigObject(jsonParser.parse(validJson).getAsJsonObject()));
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
index bacb6c3..8cc7d5c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/controllers/BbsEventProcessorControllerTest.java
@@ -42,7 +42,6 @@
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.MvcResult;
@WebMvcTest(BbsEventProcessorController.class)
@DisplayName("BBS Event Processor Controllers MVC Unit-Tests")
@@ -67,7 +66,7 @@
@Test
void sendingHeartBeatRestCall_RespondsWithAlive() throws Exception {
- MvcResult heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn();
+ var heartBeatResult = mockMvc.perform(get("/heartbeat")).andReturn();
mockMvc.perform(asyncDispatch(heartBeatResult))
.andExpect(status().isOk())
@@ -76,7 +75,7 @@
@Test
void sendingReRegistrationSubmissionRestCall_RespondsWithOk() throws Exception {
- MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn();
+ var reregistrationSubmissionResult = mockMvc.perform(post("/poll-reregistration-events")).andReturn();
mockMvc.perform(asyncDispatch(reregistrationSubmissionResult))
.andExpect(status().isOk())
@@ -86,7 +85,7 @@
@Test
void sendingCpeAuthenticationSubmissionRestCall_RespondsWithOk() throws Exception {
- MvcResult reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn();
+ var reregistrationSubmissionResult = mockMvc.perform(post("/poll-cpe-authentication-events")).andReturn();
mockMvc.perform(asyncDispatch(reregistrationSubmissionResult))
.andExpect(status().isOk())
@@ -97,7 +96,7 @@
@Test
void sendingStartTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception {
when(scheduler.reScheduleProcessingTasks()).thenReturn(true);
- MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
+ var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
mockMvc.perform(asyncDispatch(startTasksResult))
.andExpect(status().isOk())
@@ -108,7 +107,7 @@
@Test
void sendingStartTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception {
when(scheduler.reScheduleProcessingTasks()).thenReturn(false);
- MvcResult startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
+ var startTasksResult = mockMvc.perform(post("/start-tasks")).andReturn();
mockMvc.perform(asyncDispatch(startTasksResult))
.andExpect(status().isNotAcceptable())
@@ -119,7 +118,7 @@
@Test
void sendingCancelTasksRestCall_ifItSucceeds_RespondsWithOk() throws Exception {
when(scheduler.cancelScheduledProcessingTasks()).thenReturn(true);
- MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
+ var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
mockMvc.perform(asyncDispatch(cancellationResult))
.andExpect(status().isOk())
@@ -130,7 +129,7 @@
@Test
void sendingCancelTasksRestCall_ifItFails_RespondsWithNotAcceptable() throws Exception {
when(scheduler.cancelScheduledProcessingTasks()).thenReturn(false);
- MvcResult cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
+ var cancellationResult = mockMvc.perform(post("/cancel-tasks")).andReturn();
mockMvc.perform(asyncDispatch(cancellationResult))
.andExpect(status().isNotAcceptable())
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
index fd43b8b..30cc4ab 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/model/GsonSerializationsTest.java
@@ -22,7 +22,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
-import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapterFactory;
@@ -42,13 +41,13 @@
@Test
void creatingReRegistrationJsonBody_returnsJsonInString() {
- String correlationId = "NokiaCorrelationId";
- String attachmentPoint = "olt2/1/1";
- String remoteId = "RemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "NokiaCorrelationId";
+ var attachmentPoint = "olt2/1/1";
+ var remoteId = "RemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String template = "{"
+ var template = "{"
+ "\"correlationId\":\"%s\","
+ "\"attachment-point\":\"%s\","
+ "\"remote-id\":\"%s\","
@@ -65,7 +64,7 @@
.build();
- String expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan);
+ var expectedResult = String.format(template, correlationId, attachmentPoint, remoteId, cvlan, svlan);
assertEquals(expectedResult, new ReRegistrationJsonBodyBuilder().createJsonBody(model));
}
@@ -73,14 +72,14 @@
@Test
void creatingCpeAuthenticationJsonBody_returnsJsonInString() {
- String correlationId = "NokiaCorrelationID";
- AuthenticationState oldAuthenticationState = AuthenticationState.IN_SERVICE;
- AuthenticationState newAuthenticationState = AuthenticationState.OUT_OF_SERVICE;
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var correlationId = "NokiaCorrelationID";
+ var oldAuthenticationState = AuthenticationState.IN_SERVICE;
+ var newAuthenticationState = AuthenticationState.OUT_OF_SERVICE;
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String template = "{"
+ var template = "{"
+ "\"correlationId\":\"%s\","
+ "\"old-authentication-state\":\"%s\","
+ "\"new-authentication-state\":\"%s\","
@@ -99,7 +98,7 @@
.build();
- String expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(),
+ var expectedResult = String.format(template, correlationId, oldAuthenticationState.getNameInOnap(),
newAuthenticationState.getNameInOnap(), stateInterface, rgwMacAddress, swVersion);
assertEquals(expectedResult, new CpeAuthenticationJsonBodyBuilder().createJsonBody(model));
@@ -108,27 +107,27 @@
@Test
void creatingDcaeControlLoopJsonBody_returnsJsonInString() {
- String closedLoopEventClient = "DCAE.BBS_mSInstance";
- String policyVersion = "1.0.0.5";
- String policyName = "CPE_Authentication";
- String policyScope =
+ var closedLoopEventClient = "DCAE.BBS_mSInstance";
+ var policyVersion = "1.0.0.5";
+ var policyName = "CPE_Authentication";
+ var policyScope =
"service=HSIAService,type=SampleType,"
+ "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8";
- String targetType = "VM";
- long closedLoopAlarmStart = 1484677482204798L;
- String closedLoopEventStatus = "ONSET";
- String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
- String version = "1.0.2";
- String target = "vserver.vserver-name";
- String requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
- String from = "DCAE";
+ var targetType = "VM";
+ var closedLoopAlarmStart = 1484677482204798L;
+ var closedLoopEventStatus = "ONSET";
+ var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
+ var version = "1.0.2";
+ var target = "vserver.vserver-name";
+ var requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
+ var from = "DCAE";
Map<String, String> aaiEnrichmentData = new LinkedHashMap<>();
aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example");
aaiEnrichmentData.put("cvlan-id", "example cvlan-id");
aaiEnrichmentData.put("svlan-id", "example svlan-id");
- String template = "{"
+ var template = "{"
+ "\"closedLoopEventClient\":\"%s\","
+ "\"policyVersion\":\"%s\","
+ "\"policyName\":\"%s\","
@@ -165,7 +164,7 @@
.originator(from)
.build();
- String expectedResult = String.format(template,
+ var expectedResult = String.format(template,
closedLoopEventClient,
policyVersion,
policyName,
@@ -185,14 +184,14 @@
@Test
void pnfAaiObject_IsSerializedSuccessfully() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- Gson gson = gsonBuilder.create();
+ var gson = gsonBuilder.create();
- String pnfName = "NokiaCorrelationID";
- String swVersion = "1.2";
+ var pnfName = "NokiaCorrelationID";
+ var swVersion = "1.2";
- String template = "{"
+ var template = "{"
+ "\"pnf-name\":\"%s\","
+ "\"in-maint\":true,"
+ "\"sw-version\":\"%s\","
@@ -288,7 +287,7 @@
.build();
- String jsonPnfObject = String.format(template, pnfName, swVersion);
+ var jsonPnfObject = String.format(template, pnfName, swVersion);
assertEquals(jsonPnfObject, gson.toJson(pnfAaiObject));
assertEquals(pnfAaiObject, gson.fromJson(jsonPnfObject, ImmutablePnfAaiObject.class));
@@ -297,14 +296,14 @@
@Test
void serviceInstanceAaiObject_IsSerializedSuccessfully() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- Gson gson = gsonBuilder.create();
+ var gson = gsonBuilder.create();
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
- String template = "{"
+ var template = "{"
+ "\"service-instance-id\":\"%s\","
+ "\"orchestration-status\":\"%s\","
+ "\"relationship-list\":{"
@@ -370,7 +369,7 @@
.build();
- String jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus);
+ var jsonServiceInstanceObject = String.format(template, serviceInstanceId, orchestrationStatus);
assertEquals(jsonServiceInstanceObject, gson.toJson(serviceInstanceAaiObject));
assertEquals(serviceInstanceAaiObject, gson.fromJson(jsonServiceInstanceObject,
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
index c4bef9d..50fcdef 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.pipelines;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@@ -64,8 +64,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -80,12 +79,12 @@
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private HttpResponse httpResponse;
+ private MessageRouterPublishResponse publishResponse;
@BeforeEach
void setup() {
- httpResponse = Mockito.mock(HttpResponse.class);
+ publishResponse = Mockito.mock(MessageRouterPublishResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
@@ -148,12 +147,12 @@
@Test
void noResponseFromAai_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -181,13 +180,13 @@
@Test
void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -199,12 +198,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
@@ -217,7 +216,7 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never());
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -230,13 +229,13 @@
@Test
void singleCorrectEvent_handleSuccessfully() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -248,12 +247,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -266,13 +265,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -281,16 +280,16 @@
@Test
void twoCorrectEvents_handleSuccessfully() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress1 = "00:0a:95:8d:78:16";
- final String rgwMacAddress2 = "00:0a:95:8d:78:17";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress1 = "00:0a:95:8d:78:16";
+ var rgwMacAddress2 = "00:0a:95:8d:78:17";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -310,19 +309,19 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
+ var pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
+ var hsiCfsServiceInstance2 =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -339,14 +338,14 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -355,12 +354,12 @@
@Test
void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ var pnfName = "olt1";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -389,90 +388,16 @@
}
@Test
- void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
-
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress1 = "00:0a:95:8d:78:16";
- final String rgwMacAddress2 = "00:0a:95:8d:78:17";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
-
- // Prepare stubbed replies
- CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
- .correlationId(pnfName1)
- .oldAuthenticationState(oldAuthenticationState)
- .newAuthenticationState(newAuthenticationState)
- .stateInterface(stateInterface)
- .rgwMacAddress(rgwMacAddress1)
- .swVersion(swVersion)
- .build();
- CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
- .correlationId(pnfName2)
- .oldAuthenticationState(oldAuthenticationState)
- .newAuthenticationState(newAuthenticationState)
- .stateInterface(stateInterface)
- .rgwMacAddress(rgwMacAddress2)
- .swVersion(swVersion)
- .build();
-
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
- "Having unmatched RGW MAC address");
-
- // Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
- hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
- hsiCfsServiceInstance2.getServiceInstanceId());
-
- when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
- when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
- .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
-
- when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
- when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
-
- when(aaiClientTask
- .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
- .thenReturn(Mono.just(hsiCfsServiceInstance1));
- when(aaiClientTask
- .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
- .thenReturn(Mono.just(hsiCfsServiceInstance2));
-
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
-
- // Execute the pipeline
- StepVerifier.create(pipeline.executePipeline())
- .expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .verifyComplete();
-
- verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
- }
-
- @Test
void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -492,12 +417,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -510,13 +435,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -527,14 +452,14 @@
@Test
void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -554,12 +479,12 @@
.swVersion(swVersion)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -572,13 +497,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2))
@@ -630,7 +555,7 @@
private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
String pnfName,
String rgwMacAddress) {
- String orchestrationStatus = "active";
+ var orchestrationStatus = "active";
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
ImmutableRelationshipEntryAaiObject.builder()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
index 9453db3..815cb5c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
@@ -20,7 +20,7 @@
package org.onap.bbs.event.processor.pipelines;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@@ -64,8 +64,7 @@
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -80,12 +79,12 @@
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private HttpResponse httpResponse;
+ private MessageRouterPublishResponse publishResponse;
@BeforeEach
void setup() {
- httpResponse = Mockito.mock(HttpResponse.class);
+ publishResponse = Mockito.mock(MessageRouterPublishResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
@@ -148,11 +147,11 @@
@Test
void noResponseFromAai_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -179,12 +178,12 @@
@Test
void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -195,12 +194,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
+ var pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
@@ -213,7 +211,7 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.never());
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -226,12 +224,12 @@
@Test
void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -242,12 +240,12 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
+ var pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance =
constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -260,8 +258,8 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
@@ -276,12 +274,12 @@
@Test
void singleCorrectEvent_handleSuccessfully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -292,12 +290,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
+ var pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -310,13 +307,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -325,17 +322,17 @@
@Test
void twoCorrectEvents_handleSuccessfully() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -353,19 +350,17 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
+ var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
+ var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -382,14 +377,14 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -398,11 +393,11 @@
@Test
void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
- String pnfName = "olt1";
- String attachmentPoint = "olt2-2-2";
- String remoteId = "newRemoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var pnfName = "olt1";
+ var attachmentPoint = "olt2-2-2";
+ var remoteId = "newRemoteId";
+ var cvlan = "1005";
+ var svlan = "100";
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -432,17 +427,17 @@
@Test
void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
- String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
+ var hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -460,19 +455,17 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
- PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
- ServiceInstanceAaiObject hsiCfsServiceInstance1 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
- ServiceInstanceAaiObject hsiCfsServiceInstance2 =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
+ var pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
+ var pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
+ var hsiCfsServiceInstance1 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
+ var hsiCfsServiceInstance2 = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
// Prepare Mocks
- String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
- String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
- String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
+ var pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
+ var cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance1.getServiceInstanceId());
- String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance2.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -489,13 +482,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -504,16 +497,16 @@
@Test
void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -531,12 +524,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
+ var pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -549,13 +541,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -566,16 +558,16 @@
@Test
void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
- String pnfName1 = "olt1";
- String pnfName2 = "olt2";
- String attachmentPoint1 = "olt1-1-1";
- String attachmentPoint2 = "olt2-2-2";
- String remoteId1 = "newRemoteId1";
- String remoteId2 = "newRemoteId2";
- String cvlan1 = "1005";
- String cvlan2 = "1006";
- String svlan = "100";
- String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
+ var pnfName1 = "olt1";
+ var pnfName2 = "olt2";
+ var attachmentPoint1 = "olt1-1-1";
+ var attachmentPoint2 = "olt2-2-2";
+ var remoteId1 = "newRemoteId1";
+ var remoteId2 = "newRemoteId2";
+ var cvlan1 = "1005";
+ var cvlan2 = "1006";
+ var svlan = "100";
+ var hsiCfsServiceInstanceId = UUID.randomUUID().toString();
// Prepare stubbed replies
ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
@@ -593,12 +585,11 @@
.sVlan(svlan)
.build();
- PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
- ServiceInstanceAaiObject hsiCfsServiceInstance =
- constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
+ var pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
+ var hsiCfsServiceInstance = constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
// Prepare Mocks
- String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
+ var cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
hsiCfsServiceInstance.getServiceInstanceId());
when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
@@ -611,13 +602,13 @@
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
+ when(publishResponse.successful()).thenReturn(true);
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Flux.just(publishResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertTrue(r.successful()))
.verifyComplete();
verify(aaiClientTask, times(2))
@@ -719,7 +710,7 @@
private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
String pnfName,
String cvlan) {
- String orchestrationStatus = "active";
+ var orchestrationStatus = "active";
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
ImmutableRelationshipEntryAaiObject.builder()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
index f721ca7..b037e2c 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/SchedulerTest.java
@@ -50,9 +50,9 @@
SchedulerTest() {
configuration = Mockito.mock(ApplicationConfiguration.class);
taskScheduler = Mockito.mock(TaskScheduler.class);
- ReRegistrationPipeline reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class);
- CpeAuthenticationPipeline cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class);
- ConsulConfigurationGateway configurationGateway = Mockito.mock(ConsulConfigurationGateway.class);
+ var reRegistrationPipeline = Mockito.mock(ReRegistrationPipeline.class);
+ var cpeAuthenticationPipeline = Mockito.mock(CpeAuthenticationPipeline.class);
+ var configurationGateway = Mockito.mock(ConsulConfigurationGateway.class);
this.applicationScheduler = new Scheduler(configuration, configurationGateway, taskScheduler,
reRegistrationPipeline, cpeAuthenticationPipeline);
}
@@ -60,7 +60,7 @@
@Test
void scheduleTasksWithValidSchedulingPeriod_Succeeds() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture = Mockito.mock(ScheduledFuture.class);
when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class)))
.thenReturn(scheduledFuture);
@@ -75,8 +75,8 @@
@Test
void cancellingRunningTasksSucceeds_tasksAreDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(true);
when(scheduledFuture2.cancel(false)).thenReturn(true);
when(scheduledFuture1.isCancelled()).thenReturn(true);
@@ -85,7 +85,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Successfully cancelling tasks",
() -> assertTrue(result, "Result of cancellation task"),
() -> assertEquals(0, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -97,8 +97,8 @@
@Test
void cancellingRunningTasksPartiallyFailing_tasksAreNotDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(true);
when(scheduledFuture2.cancel(false)).thenReturn(false);
when(scheduledFuture1.isCancelled()).thenReturn(true);
@@ -107,7 +107,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Partially cancelling tasks",
() -> assertFalse(result, "Result of cancellation task"),
() -> assertEquals(1, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -119,8 +119,8 @@
@Test
void cancellingRunningTasksFailingForAllOfThem_noTasksAreDeleted() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.cancel(false)).thenReturn(false);
when(scheduledFuture2.cancel(false)).thenReturn(false);
when(scheduledFuture1.isCancelled()).thenReturn(false);
@@ -129,7 +129,7 @@
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.cancelScheduledProcessingTasks();
+ var result = applicationScheduler.cancelScheduledProcessingTasks();
assertAll("Failing in cancelling tasks",
() -> assertFalse(result, "Result of cancellation task"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -141,15 +141,15 @@
@Test
void reSchedulingWithExistingActiveTasks_Fails() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.isCancelled()).thenReturn(false);
when(scheduledFuture2.isCancelled()).thenReturn(false);
when(taskScheduler.scheduleAtFixedRate(any(Runnable.class), any(Instant.class), any(Duration.class)))
.thenReturn(scheduledFuture1).thenReturn(scheduledFuture2);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.reScheduleProcessingTasks();
+ var result = applicationScheduler.reScheduleProcessingTasks();
assertAll("Rescheduling with active tasks",
() -> assertFalse(result, "Result of re-scheduling"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
@@ -162,11 +162,11 @@
void reSchedulingWithExistingCancelledTasks_Succeeds() {
when(configuration.getPipelinesPollingIntervalInSeconds()).thenReturn(20);
// Initial tasks
- ScheduledFuture scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture1 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture2 = Mockito.mock(ScheduledFuture.class);
// Re-scheduled tasks
- ScheduledFuture scheduledFuture3 = Mockito.mock(ScheduledFuture.class);
- ScheduledFuture scheduledFuture4 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture3 = Mockito.mock(ScheduledFuture.class);
+ var scheduledFuture4 = Mockito.mock(ScheduledFuture.class);
when(scheduledFuture1.isCancelled()).thenReturn(true);
when(scheduledFuture2.isCancelled()).thenReturn(true);
when(scheduledFuture3.isCancelled()).thenReturn(false);
@@ -178,7 +178,7 @@
.thenReturn(scheduledFuture4);
applicationScheduler.setupScheduler();
- boolean result = applicationScheduler.reScheduleProcessingTasks();
+ var result = applicationScheduler.reScheduleProcessingTasks();
assertAll("Rescheduling with cancelled tasks",
() -> assertTrue(result, "Result of re-scheduling"),
() -> assertEquals(2, applicationScheduler.numberOfTotalTasks(), "Total tasks"),
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
index db5f7cb..e13b8bc 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/AaiClientTaskImplTest.java
@@ -64,7 +64,7 @@
@BeforeEach
void init() {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
reactiveClient = Mockito.mock(AaiReactiveClient.class);
task = new AaiClientTaskImpl(reactiveClient);
@@ -73,7 +73,7 @@
@Test
void passingEmptyPnfObject_NothingHappens() throws AaiTaskException {
when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.empty());
- Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Empty PNF task", "some-url");
+ var pnf = task.executePnfRetrieval("Empty PNF task", "some-url");
verify(reactiveClient).getPnfObjectDataFor("some-url");
assertNull(pnf.block(Duration.ofSeconds(5)));
@@ -82,7 +82,7 @@
@Test
void passingEmptyServiceInstanceObject_NothingHappens() throws AaiTaskException {
when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class))).thenReturn(Mono.empty());
- Mono<ServiceInstanceAaiObject> serviceInstance =
+ var serviceInstance =
task.executeServiceInstanceRetrieval("Empty Service Instance task", "some-url");
verify(reactiveClient).getServiceInstanceObjectDataFor("some-url");
@@ -92,8 +92,8 @@
@Test
void passingPnfObject_taskSucceeds() throws AaiTaskException {
- String pnfName = "pnf-1";
- String attachmentPoint = "olt1-1-1";
+ var pnfName = "pnf-1";
+ var attachmentPoint = "olt1-1-1";
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
@@ -142,7 +142,7 @@
.build();
when(reactiveClient.getPnfObjectDataFor(any(String.class))).thenReturn(Mono.just(pnfAaiObject));
- Mono<PnfAaiObject> pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url");
+ var pnf = task.executePnfRetrieval("Normal PNF retrieval task", "some-url");
verify(reactiveClient).getPnfObjectDataFor("some-url");
assertNotNull(pnf.block(Duration.ofSeconds(5)));
@@ -151,7 +151,7 @@
.expectSubscription()
.consumeNextWith(aPnf -> {
Assertions.assertEquals(pnfName, aPnf.getPnfName(), "PNF Name in response does not match");
- String extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var extractedAttachmentPoint = aPnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> e.getRelatedTo().equals("logical-link"))
.flatMap(e -> e.getRelationshipData().stream())
@@ -167,8 +167,8 @@
@Test
void passingServiceInstanceObject_taskSucceeds() throws AaiTaskException {
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
@@ -207,7 +207,7 @@
when(reactiveClient.getServiceInstanceObjectDataFor(any(String.class)))
.thenReturn(Mono.just(serviceInstanceAaiObject));
- Mono<ServiceInstanceAaiObject> serviceInstance =
+ var serviceInstance =
task.executeServiceInstanceRetrieval("Normal Service Instance retrieval task",
"some-url");
@@ -220,10 +220,10 @@
Assertions.assertEquals(serviceInstanceId, instance.getServiceInstanceId(),
"Service Instance ID in response does not match");
- MetadataListAaiObject extractedMetadataListObject =
+ var extractedMetadataListObject =
instance.getMetadataListAaiObject().orElseThrow(AaiClientTaskTestException::new);
- MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry =
+ var extractedMetadataEntry =
extractedMetadataListObject.getMetadataEntries()
.stream()
.filter(m -> m.getMetaname().equals("cvlan"))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
index 40bcb65..f644d44 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
@@ -20,38 +20,39 @@
package org.onap.bbs.event.processor.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.Assert;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapCpeAuthenticationConsumerProperties;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.CpeAuthenticationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapCpeAuthenticationConsumerTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.CPE_AUTHENTICATION";
+ private static final String SUBSCRIBER_ID = "subscriberID";
+ private static final String SUBSCRIBER_GROUP = "subscriberGroup";
+
private static final String CPE_AUTHENTICATION_EVENT_TEMPLATE = "{\"event\": {"
+ "\"commonEventHeader\": { \"sourceName\":\"%s\"},"
+ "\"stateChangeFields\": {"
@@ -65,32 +66,34 @@
private static DmaapCpeAuthenticationConsumerTask dmaapConsumerTask;
private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel;
- private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static MessageRouterSubscriber subscriber;
private static Gson gson = new Gson();
+ private static ApplicationConfiguration configuration;
@BeforeAll
- static void setUp() throws SSLException {
-
- final String sourceName = "PNF-CorrelationId";
- final String oldAuthenticationState = "outOfService";
- final String newAuthenticationState = "inService";
- final String stateInterface = "stateInterface";
- final String rgwMacAddress = "00:0a:95:8d:78:16";
- final String swVersion = "1.2";
+ static void setUp() {
// Mock Re-registration configuration
- DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration();
- ApplicationConfiguration configuration = mock(ApplicationConfiguration.class);
- when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ configuration = mock(ApplicationConfiguration.class);
+ var props = mock(DmaapCpeAuthenticationConsumerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID);
+ when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP);
+ when(configuration.getDmaapCpeAuthenticationConsumerProperties()).thenReturn(props);
- // Mock reactive DMaaP client
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
+ var subscriberConfig = mock(MessageRouterSubscriberConfig.class);
+ when(configuration.getDmaapCpeAuthenticationConsumerConfiguration()).thenReturn(subscriberConfig);
- dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration,
- new CpeAuthenticationDmaapConsumerJsonParser(), httpClientFactory);
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
cpeAuthenticationConsumerDmaapModel = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -101,58 +104,42 @@
.swVersion(swVersion)
.build();
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
eventsArray = "[" + event + "]";
}
- @AfterEach
- void resetMock() {
- reset(dMaaPConsumerReactiveHttpClient);
- }
-
@Test
void passingEmptyMessage_NothingHappens() throws Exception {
- JsonElement empty = gson.toJsonTree("");
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
+ var empty = gson.toJsonTree("");
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(empty));
+
+ dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber,
+ new CpeAuthenticationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
+
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
@Test
void passingNormalMessage_ResponseSucceeds() throws Exception {
- JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
- .thenReturn(Mono.just(normalEventsArray));
+ var normalEventsArray = gson.toJsonTree(eventsArray);
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray));
+
+ dmaapConsumerTask = new DmaapCpeAuthenticationConsumerTaskImpl(configuration, subscriber,
+ new CpeAuthenticationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- }
-
- private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("consumer-group")
- .consumerId("consumer-id")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("change it")
- .trustStorePasswordPath("change_it")
- .keyStorePath("change it")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.CPE_AUTHENTICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .build();
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
index 436206d..7c15848 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
@@ -21,7 +21,6 @@
package org.onap.bbs.event.processor.tasks;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -30,55 +29,52 @@
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapProducerProperties;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.springframework.http.HttpStatus;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapPublisherTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.DCAE_CL_OUTPUT";
+
private static ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel;
private static DmaapPublisherTaskImpl task;
- private static DMaaPPublisherReactiveHttpClient reactiveHttpClient;
private static ApplicationConfiguration configuration;
- private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@BeforeAll
static void setUp() {
- dmaapPublisherConfiguration = testVersionOfDmaapPublisherConfiguration();
configuration = mock(ApplicationConfiguration.class);
- final String closedLoopEventClient = "DCAE.BBS_mSInstance";
- final String policyVersion = "1.0.0.5";
- final String policyName = "CPE_Authentication";
- final String policyScope =
+ final var closedLoopEventClient = "DCAE.BBS_mSInstance";
+ final var policyVersion = "1.0.0.5";
+ final var policyName = "CPE_Authentication";
+ final var policyScope =
"service=HSIAService,type=SampleType,"
+ "closedLoopControlName=CL-CPE_A-d925ed73-8231-4d02-9545-db4e101f88f8";
- final String targetType = "VM";
- final long closedLoopAlarmStart = 1484677482204798L;
- final String closedLoopEventStatus = "ONSET";
- final String closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
- final String version = "1.0.2";
- final String target = "vserver.vserver-name";
- final String requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
- final String from = "DCAE";
+ final var targetType = "VM";
+ final var closedLoopAlarmStart = 1484677482204798L;
+ final var closedLoopEventStatus = "ONSET";
+ final var closedLoopControlName = "ControlLoop-CPE_A-2179b738-fd36-4843-a71a-a8c24c70c88b";
+ final var version = "1.0.2";
+ final var target = "vserver.vserver-name";
+ final var requestId = "97964e10-686e-4790-8c45-bdfa61df770f";
+ final var from = "DCAE";
final Map<String, String> aaiEnrichmentData = new LinkedHashMap<>();
aaiEnrichmentData.put("service-information.service-instance-id", "service-instance-id-example");
@@ -100,77 +96,64 @@
.requestId(requestId)
.originator(from)
.build();
+ var props = mock(DmaapProducerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(configuration.getDmaapProducerProperties()).thenReturn(props);
- when(configuration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+ var publisherConfig = mock(MessageRouterPublisherConfig.class);
+ when(configuration.getDmaapPublisherConfiguration()).thenReturn(publisherConfig);
+
+
}
@Test
void passingNullMessage_ExceptionIsRaised() {
- task = new DmaapPublisherTaskImpl(configuration);
-
Executable executableFunction = () -> task.execute(null);
Assertions.assertThrows(DmaapException.class, executableFunction, "Input message is invalid");
}
@Test
- void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException {
- HttpResponse response = setupMocks(HttpStatus.OK.value());
+ void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException {
+ var publisher = mock(MessageRouterPublisher.class);
+ task = new DmaapPublisherTaskImpl(configuration, publisher);
- StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(response).verifyComplete();
+ var response = mockResponse(true);
+ when(publisher.put(any(),any())).thenReturn(Flux.just(response));
- verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- verifyNoMoreInteractions(reactiveHttpClient);
+ StepVerifier.create(task.execute(controlLoopPublisherDmaapModel))
+ .expectSubscription()
+ .assertNext(r -> Assertions.assertTrue(r.successful()))
+ .verifyComplete();
+
+ verify(publisher, times(1)).put(any(),any());
+ verifyNoMoreInteractions(publisher);
}
@Test
- void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException {
- HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value());
+ void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException {
+ var publisher = mock(MessageRouterPublisher.class);
+ task = new DmaapPublisherTaskImpl(configuration, publisher);
- StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(response).verifyComplete();
+ var response = mockResponse(false);
+ when(publisher.put(any(),any())).thenReturn(Flux.just(response));
- verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
- verifyNoMoreInteractions(reactiveHttpClient);
+ StepVerifier.create(task.execute(controlLoopPublisherDmaapModel))
+ .expectSubscription()
+ .assertNext(r -> Assertions.assertFalse(r.successful()))
+ .verifyComplete();
+
+ verify(publisher, times(1)).put(any(),any());
+ verifyNoMoreInteractions(publisher);
}
- // We can safely suppress unchecked assignment warning here since it is a mock class
- @SuppressWarnings("unchecked")
- private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException {
-
- HttpResponse response = mock(HttpResponse.class);
- when(response.statusCode()).thenReturn(httpResponseCode);
-
- reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
- when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class)))
- .thenReturn(Mono.just(response));
-
- PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
- doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration);
-
- task = new DmaapPublisherTaskImpl(configuration, httpClientFactory);
-
+ private MessageRouterPublishResponse mockResponse(boolean isSuccess) {
+ var response = mock(MessageRouterPublishResponse.class);
+ when(response.successful()).thenReturn(isSuccess);
return response;
}
-
- private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.DCAE_CL_OUTPUT")
- .build();
- }
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
index 72e2898..edbe511 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
@@ -20,38 +20,39 @@
package org.onap.bbs.event.processor.tasks;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-
-import java.util.Optional;
-
-import javax.net.ssl.SSLException;
import org.junit.Assert;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
+import org.onap.bbs.event.processor.config.DmaapReRegistrationConsumerProperties;
import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
import org.onap.bbs.event.processor.utilities.ReRegistrationDmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class DmaapReRegistrationConsumerTaskImplTest {
+ private static final String DMAAP_PROTOCOL = "http";
+ private static final String DMAAP_HOST = "message-router.onap.svc.cluster.local";
+ private static final int DMAAP_PORT = 3904;
+ private static final String DMAAP_TOPIC = "unauthenticated.PNF_REREGISTRATION";
+ private static final String SUBSCRIBER_ID = "subscriberID";
+ private static final String SUBSCRIBER_GROUP = "subscriberGroup";
+
private static final String RE_REGISTRATION_EVENT_TEMPLATE = "{\"event\": {"
+ "\"commonEventHeader\": { \"sourceName\":\"%s\"},"
+ "\"additionalFields\": {"
@@ -63,31 +64,33 @@
private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask;
private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel;
- private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static MessageRouterSubscriber subscriber;
private static Gson gson = new Gson();
+ private static ApplicationConfiguration configuration;
@BeforeAll
- static void setUp() throws SSLException {
-
- final String sourceName = "PNF-CorrelationId";
- final String attachmentPoint = "olt2/2/2";
- final String remoteId = "remoteId";
- final String cvlan = "1005";
- final String svlan = "100";
+ static void setUp() {
// Mock Re-registration configuration
- DmaapConsumerConfiguration dmaapConsumerConfiguration = testVersionOfDmaapConsumerConfiguration();
- ApplicationConfiguration configuration = mock(ApplicationConfiguration.class);
- when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ configuration = mock(ApplicationConfiguration.class);
+ var props = mock(DmaapReRegistrationConsumerProperties.class);
+ when(props.getDmaapProtocol()).thenReturn(DMAAP_PROTOCOL);
+ when(props.getDmaapHostName()).thenReturn(DMAAP_HOST);
+ when(props.getDmaapPortNumber()).thenReturn(DMAAP_PORT);
+ when(props.getDmaapTopicName()).thenReturn(DMAAP_TOPIC);
+ when(props.getConsumerId()).thenReturn(SUBSCRIBER_ID);
+ when(props.getConsumerGroup()).thenReturn(SUBSCRIBER_GROUP);
+ when(configuration.getDmaapReRegistrationConsumerProperties()).thenReturn(props);
- // Mock reactive DMaaP client
- ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
- dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
+ var subscriberConfig = mock(MessageRouterSubscriberConfig.class);
+ when(configuration.getDmaapReRegistrationConsumerConfiguration()).thenReturn(subscriberConfig);
- dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration,
- new ReRegistrationDmaapConsumerJsonParser(), httpClientFactory);
+ var sourceName = "PNF-CorrelationId";
+ var attachmentPoint = "olt2/2/2";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
reRegistrationConsumerDmaapModel = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -97,58 +100,43 @@
.sVlan(svlan)
.build();
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
cvlan, svlan);
eventsArray = "[" + event + "]";
}
- @AfterEach
- void resetMock() {
- reset(dMaaPConsumerReactiveHttpClient);
- }
-
@Test
void passingEmptyMessage_NothingHappens() {
- JsonElement empty = gson.toJsonTree("");
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
+ var empty = gson.toJsonTree("");
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(empty));
+
+ dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber,
+ new ReRegistrationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
+
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
@Test
void passingNormalMessage_ResponseSucceeds() {
- JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
- .thenReturn(Mono.just(normalEventsArray));
+ System.out.println("Events sent : " + eventsArray);
+ var normalEventsArray = gson.toJsonTree(eventsArray);
+ subscriber = mock(MessageRouterSubscriber.class);
+ when(subscriber.getElements(any())).thenReturn(Flux.just(normalEventsArray));
+
+ dmaapConsumerTask = new DmaapReRegistrationConsumerTaskImpl(configuration, subscriber,
+ new ReRegistrationDmaapConsumerJsonParser());
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
- }
-
- private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("message-router.onap.svc.cluster.local")
- .dmaapPortNumber(3904)
- .dmaapProtocol("http")
- .dmaapUserName("admin")
- .dmaapUserPassword("admin")
- .trustStorePath("/opt/app/bbs/local/org.onap.bbs.trust.jks")
- .trustStorePasswordPath("change_it")
- .keyStorePath("/opt/app/bbs/local/org.onap.bbs.p12")
- .keyStorePasswordPath("change_it")
- .enableDmaapCertAuth(false)
- .dmaapTopicName("/events/unauthenticated.PNF_REREGISTRATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .build();
+ verify(subscriber, times(1)).getElements(any());
+ verifyNoMoreInteractions(subscriber);
}
}
\ No newline at end of file
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
index 8e3c46b..2876dd9 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/AaiReactiveClientTest.java
@@ -37,8 +37,6 @@
import java.util.HashMap;
import java.util.ServiceLoader;
-import javax.net.ssl.SSLException;
-
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -74,13 +72,13 @@
private static WireMockServer wireMockServer;
@BeforeAll
- static void init() throws SSLException {
- GsonBuilder gsonBuilder = new GsonBuilder();
+ static void init() {
+ var gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
gson = gsonBuilder.create();
- ApplicationConfiguration configuration = Mockito.mock(ApplicationConfiguration.class);
- AaiClientConfiguration aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class);
+ var configuration = Mockito.mock(ApplicationConfiguration.class);
+ var aaiClientConfiguration = Mockito.mock(AaiClientConfiguration.class);
when(configuration.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
when(aaiClientConfiguration.aaiUserName()).thenReturn("AAI");
when(aaiClientConfiguration.aaiUserPassword()).thenReturn("AAI");
@@ -106,10 +104,10 @@
@Test
void sendingReactiveRequestForPnf_Succeeds() {
- String pnfName = "pnf-1";
- String attachmentPoint = "olt1-1-1";
+ var pnfName = "pnf-1";
+ var attachmentPoint = "olt1-1-1";
- String pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName);
+ var pnfUrl = String.format("/aai/v14/network/pnfs/pnf/%s?depth=1", pnfName);
// Build Relationship Data
RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
@@ -166,7 +164,7 @@
.expectSubscription()
.consumeNextWith(pnf -> {
Assertions.assertEquals(pnfName, pnf.getPnfName(), "PNF Name in response does not match");
- String extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries()
+ var extractedAttachmentPoint = pnf.getRelationshipListAaiObject().getRelationshipEntries()
.stream()
.filter(e -> e.getRelatedTo().equals("logical-link"))
.flatMap(e -> e.getRelationshipData().stream())
@@ -182,10 +180,10 @@
@Test
void sendingReactiveRequestForServiceInstance_Succeeds() {
- String serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
- String orchestrationStatus = "active";
+ var serviceInstanceId = "84003b26-6b76-4c75-b805-7b14ab4ffaef";
+ var orchestrationStatus = "active";
- String serviceInstanceUrl =
+ var serviceInstanceUrl =
String.format("/aai/v14/nodes/service-instances/service-instance/%s?format=resource_and_url",
serviceInstanceId);
@@ -237,10 +235,10 @@
Assertions.assertEquals(serviceInstanceId, serviceInstance.getServiceInstanceId(),
"Service Instance ID in response does not match");
- MetadataListAaiObject extractedMetadataListObject =
+ var extractedMetadataListObject =
serviceInstance.getMetadataListAaiObject().orElseThrow(AaiReactiveClientTestException::new);
- MetadataListAaiObject.MetadataEntryAaiObject extractedMetadataEntry =
+ var extractedMetadataEntry =
extractedMetadataListObject.getMetadataEntries()
.stream()
.filter(m -> m.getMetaname().equals("cvlan"))
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
index c7ad793..d7970ae 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
@@ -104,9 +104,8 @@
@Test
void passingNonJson_getIllegalStateException() {
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- new CpeAuthenticationDmaapConsumerJsonParser();
- JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser();
+ var jsonReader = new JsonReader(new StringReader("not JSON"));
jsonReader.setLenient(true);
JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
@@ -118,8 +117,7 @@
@Test
void passingNoEvents_EmptyFluxIsReturned() {
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- new CpeAuthenticationDmaapConsumerJsonParser();
+ var consumerJsonParser = new CpeAuthenticationDmaapConsumerJsonParser();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
@@ -129,23 +127,22 @@
@Test
void passingOneCorrectEvent_validationSucceeds() {
- String sourceName = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
CpeAuthenticationConsumerDmaapModel expectedEventObject = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.correlationId(sourceName)
@@ -164,30 +161,29 @@
@Test
void passingTwoCorrectEvents_validationSucceeds() {
- String sourceName1 = "PNF-CorrelationId";
- String sourceName2 = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress1 = "00:0a:95:8d:78:16";
- String rgwMacAddress2 = "00:0a:95:8d:78:17";
- String swVersion = "1.2";
+ var sourceName1 = "PNF-CorrelationId";
+ var sourceName2 = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress1 = "00:0a:95:8d:78:16";
+ var rgwMacAddress2 = "00:0a:95:8d:78:17";
+ var swVersion = "1.2";
- String firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState,
+ var firstEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName1, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress1, swVersion);
- String secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState,
+ var secondEvent = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE, sourceName2, oldAuthenticationState,
newAuthenticationState, stateInterface, rgwMacAddress2, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement1 = jsonParser.parse(firstEvent);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement1 = jsonParser.parse(firstEvent);
Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1);
- JsonElement jsonElement2 = jsonParser.parse(secondEvent);
+ var jsonElement2 = jsonParser.parse(secondEvent);
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
+ var eventsArray = "[" + firstEvent + "," + secondEvent + "]";
CpeAuthenticationConsumerDmaapModel expectedFirstEventObject =
ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -217,22 +213,21 @@
@Test
void passingJsonWithMissingAuthenticationState_validationFails() {
- String sourceName = "PNF-CorrelationId";
- String oldAuthenticationState = "outOfService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var sourceName = "PNF-CorrelationId";
+ var oldAuthenticationState = "outOfService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_AUTHENTICATION_STATE, sourceName,
oldAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -242,22 +237,21 @@
@Test
void passingJsonWithMissingSourceName_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -267,22 +261,21 @@
@Test
void passingJsonWithMissingSourceNameValue_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_SOURCE_NAME_VALUE,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -292,22 +285,21 @@
@Test
void passingJsonWithMissingStateChangeFieldsHeader_validationFails() {
- String oldAuthenticationState = "outOfService";
- String newAuthenticationState = "inService";
- String stateInterface = "stateInterface";
- String rgwMacAddress = "00:0a:95:8d:78:16";
- String swVersion = "1.2";
+ var oldAuthenticationState = "outOfService";
+ var newAuthenticationState = "inService";
+ var stateInterface = "stateInterface";
+ var rgwMacAddress = "00:0a:95:8d:78:16";
+ var swVersion = "1.2";
- String event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS,
+ var event = String.format(CPE_AUTHENTICATION_EVENT_TEMPLATE_WITH_MISSING_STATE_CHANGE_FIELDS,
oldAuthenticationState, newAuthenticationState, stateInterface, rgwMacAddress, swVersion);
- CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
- spy(new CpeAuthenticationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new CpeAuthenticationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
diff --git a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
index cd238e2..6d78826 100644
--- a/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
+++ b/components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java
@@ -93,8 +93,8 @@
@Test
void passingNonJson_getIllegalStateException() {
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
- JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
+ var jsonReader = new JsonReader(new StringReader("not JSON"));
jsonReader.setLenient(true);
JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
@@ -105,7 +105,7 @@
@Test
void passingNoEvents_EmptyFluxIsReturned() {
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
+ var consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
@@ -114,21 +114,21 @@
@Test
void passingOneCorrectEvent_validationSucceeds() {
- String correlationId = "PNF-CorrelationId";
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId, attachmentPoint,
remoteId, cvlan, svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
ReRegistrationConsumerDmaapModel expectedEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId)
@@ -146,29 +146,29 @@
@Test
void passingTwoCorrectEvents_validationSucceeds() {
- String correlationId1 = "PNF-CorrelationId1";
- String correlationId2 = "PNF-CorrelationId2";
- String attachmentPoint1 = "olt1/1/1";
- String attachmentPoint2 = "olt2/2/2";
- String remoteId1 = "remoteId1";
- String remoteId2 = "remoteId2";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId1 = "PNF-CorrelationId1";
+ var correlationId2 = "PNF-CorrelationId2";
+ var attachmentPoint1 = "olt1/1/1";
+ var attachmentPoint2 = "olt2/2/2";
+ var remoteId1 = "remoteId1";
+ var remoteId2 = "remoteId2";
+ var cvlan = "1005";
+ var svlan = "100";
- String firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
+ var firstEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
remoteId1, cvlan, svlan);
- String secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
+ var secondEvent = String.format(RE_REGISTRATION_EVENT_TEMPLATE, correlationId1, attachmentPoint1,
remoteId1, cvlan, svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement1 = jsonParser.parse(firstEvent);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement1 = jsonParser.parse(firstEvent);
Mockito.doReturn(Optional.of(jsonElement1.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement1);
- JsonElement jsonElement2 = jsonParser.parse(secondEvent);
+ var jsonElement2 = jsonParser.parse(secondEvent);
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
+ var eventsArray = "[" + firstEvent + "," + secondEvent + "]";
ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId1)
@@ -194,23 +194,23 @@
@Test
void passingJsonWithMissingAttachmentPoint_validationFails() {
- String correlationId = "PNF-CorrelationId";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ATTACHMENT_POINT,
correlationId,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -220,23 +220,23 @@
@Test
void passingJsonWithMissingCorrelationId_validationFails() {
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -246,23 +246,23 @@
@Test
void passingJsonWithMissingCorrelationIdValue_validationFails() {
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_CORRELATION_ID_VALUE,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
@@ -272,25 +272,25 @@
@Test
void passingJsonWithMissingAdditionalFields_validationFails() {
- String correlationId = "PNF-CorrelationId";
- String attachmentPoint = "olt1/1/1";
- String remoteId = "remoteId";
- String cvlan = "1005";
- String svlan = "100";
+ var correlationId = "PNF-CorrelationId";
+ var attachmentPoint = "olt1/1/1";
+ var remoteId = "remoteId";
+ var cvlan = "1005";
+ var svlan = "100";
- String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS,
+ var event = String.format(RE_REGISTRATION_EVENT_TEMPLATE_MISSING_ADDITIONAL_FIELDS,
correlationId,
attachmentPoint,
remoteId,
cvlan,
svlan);
- ReRegistrationDmaapConsumerJsonParser consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
- JsonElement jsonElement = jsonParser.parse(event);
+ var consumerJsonParser = spy(new ReRegistrationDmaapConsumerJsonParser());
+ var jsonElement = jsonParser.parse(event);
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement);
- String eventsArray = "[" + event + "]";
+ var eventsArray = "[" + event + "]";
StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
diff --git a/components/bbs-event-processor/src/test/resources/logback-test.xml b/components/bbs-event-processor/src/test/resources/logback-test.xml
index 0b93a43..dd7dcb7 100644
--- a/components/bbs-event-processor/src/test/resources/logback-test.xml
+++ b/components/bbs-event-processor/src/test/resources/logback-test.xml
@@ -20,5 +20,5 @@
<include resource="org/springframework/boot/logging/logback/base.xml" />
<root level="ERROR"/>
<logger name="org.springframework" level="ERROR"/>
- <logger name="org.onap" level="WARN"/>
+ <logger name="org.onap.bbs" level="WARN"/>
</configuration>
\ No newline at end of file
diff --git a/components/bbs-event-processor/version.properties b/components/bbs-event-processor/version.properties
index 7b8b963..358e99c 100644
--- a/components/bbs-event-processor/version.properties
+++ b/components/bbs-event-processor/version.properties
@@ -1,5 +1,5 @@
-major=1
-minor=1
+major=2
+minor=0
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}
diff --git a/components/pm-subscription-handler/Dockerfile b/components/pm-subscription-handler/Dockerfile
index b1f3129..8eed60b 100644
--- a/components/pm-subscription-handler/Dockerfile
+++ b/components/pm-subscription-handler/Dockerfile
@@ -51,4 +51,4 @@
USER $PMSHUSER
# run the app
-ENTRYPOINT ["python", "./bin/pmsh_service.py"]
\ No newline at end of file
+ENTRYPOINT ["python", "./bin/pmsh_service_main.py"]
\ No newline at end of file
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 722188a..e09ec28 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -25,11 +25,26 @@
db = SQLAlchemy()
basedir = os.path.abspath(os.path.dirname(__file__))
+_connexion_app = None
+
+
+def _get_app():
+ global _connexion_app
+ if not _connexion_app:
+ _connexion_app = App(__name__, specification_dir=basedir)
+ return _connexion_app
+
+
+def launch_api_server(app_config):
+ connex_app = _get_app()
+ connex_app.add_api('pmsh_swagger.yml')
+ connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
+ ssl_context=(app_config.cert_path, app_config.key_path))
def create_app():
logger.create_loggers(os.getenv('LOGS_PATH'))
- connex_app = App(__name__, specification_dir=basedir)
+ connex_app = _get_app()
app = connex_app.app
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_RECORD_QUERIES'] = True
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
new file mode 100755
index 0000000..9d69e76
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
@@ -0,0 +1,91 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+
+import json
+from enum import Enum
+
+from mod import pmsh_logging as logger
+from mod.network_function import NetworkFunction
+from mod.subscription import NetworkFunctionFilter
+
+
+class XNFType(Enum):
+ PNF = 'pnf'
+ VNF = 'vnf'
+
+
+class AAIEvent(Enum):
+ DELETE = 'DELETE'
+ UPDATE = 'UPDATE'
+
+
+class OrchestrationStatus(Enum):
+ ACTIVE = 'Active'
+ INVENTORIED = 'Inventoried'
+
+
+def process_aai_events(mr_sub, subscription, mr_pub, app, app_conf):
+ """
+ Processes AAI UPDATE events for each filtered xNFs where orchestration status is set to Active.
+
+ Args:
+ mr_sub (_MrSub): MR subscriber
+ subscription (Subscription): The current subscription object
+ mr_pub (_MrPub): MR publisher
+ app (db): DB application
+ app_conf (AppConfig): the application configuration.
+ """
+ app.app_context().push()
+ aai_events = mr_sub.get_from_topic('AAI-EVENT')
+
+ if _aai_event_exists(aai_events):
+ for entry in aai_events:
+ logger.debug(f'AAI-EVENT entry: {entry}')
+ entry = json.loads(entry)
+ event_header = entry['event-header']
+ aai_xnf = entry['entity']
+ action = event_header['action']
+ entity_type = event_header['entity-type']
+ xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[
+ 'vnf-name']
+ new_status = aai_xnf['orchestration-status']
+
+ if NetworkFunctionFilter(**subscription.nfFilter).is_nf_in_filter(xnf_name):
+ _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf)
+
+
+def _process_event(action, new_status, xnf_name, subscription, mr_pub, app_conf):
+ if action == AAIEvent.UPDATE.value:
+ logger.debug(f'Update event found for network function {xnf_name}')
+ local_xnf = NetworkFunction.get(xnf_name)
+
+ if local_xnf is None:
+ logger.debug(f'Activating subscription for network function {xnf_name}')
+ subscription.process_subscription([NetworkFunction(
+ nf_name=xnf_name, orchestration_status=new_status)], mr_pub, app_conf)
+ else:
+ logger.debug(f"Update Event for network function {xnf_name} will not be processed "
+ f" as it's state is set to {local_xnf.orchestration_status}.")
+ elif action == AAIEvent.DELETE.value:
+ logger.debug(f'Delete event found for network function {xnf_name}')
+ NetworkFunction.delete(nf_name=xnf_name)
+ logger.debug(f'{xnf_name} successfully deleted.')
+
+
+def _aai_event_exists(aai_events):
+ return aai_events is not None and len(aai_events) != 0
diff --git a/components/pm-subscription-handler/pmsh_service/mod/db_models.py b/components/pm-subscription-handler/pmsh_service/mod/db_models.py
index 479d40e..d183676 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/db_models.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/db_models.py
@@ -67,6 +67,9 @@
class NfSubRelationalModel(db.Model):
__tablename__ = 'nf_to_sub_rel'
+ __mapper_args__ = {
+ 'confirm_deleted_rows': False
+ }
id = Column(Integer, primary_key=True, autoincrement=True)
subscription_name = Column(
String,
diff --git a/components/pm-subscription-handler/pmsh_service/mod/healthcheck.py b/components/pm-subscription-handler/pmsh_service/mod/healthcheck.py
new file mode 100755
index 0000000..af82fc4
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/healthcheck.py
@@ -0,0 +1,30 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+
+
+def status():
+ """
+ Returns the health of the PMSH service
+ Args:
+ NA
+ Returns:
+ Dictionary detailing 'status' of either 'healthy' or 'unhealthy'.
+ Raises:
+ NA
+ """
+ return {'status': 'healthy'}
diff --git a/components/pm-subscription-handler/pmsh_service/mod/network_function.py b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
index 9f21cc6..c4b9b56 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/network_function.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/network_function.py
@@ -32,6 +32,13 @@
def __str__(self):
return f'nf-name: {self.nf_name}, orchestration-status: {self.orchestration_status}'
+ def __eq__(self, other):
+ return self.nf_name == other.nf_name and \
+ self.orchestration_status == other.orchestration_status
+
+ def __hash__(self):
+ return hash((self.nf_name, self.orchestration_status))
+
def create(self):
""" Creates a NetworkFunction database entry """
existing_nf = NetworkFunctionModel.query.filter(
@@ -72,8 +79,7 @@
def delete(**kwargs):
""" Deletes a network function from the database """
nf_name = kwargs['nf_name']
- NetworkFunctionModel.query.filter(
- NetworkFunctionModel.nf_name == nf_name). \
- delete(synchronize_session='evaluate')
+ nf = NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == nf_name).one_or_none()
- db.session.commit()
+ db.session.delete(nf) if nf else None
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_swagger.yml b/components/pm-subscription-handler/pmsh_service/mod/pmsh_swagger.yml
new file mode 100644
index 0000000..7bfecd8
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_swagger.yml
@@ -0,0 +1,34 @@
+swagger: "2.0"
+info:
+ title: PM Subscription Handler Service
+ version: "1.0.0"
+ description: This is the swagger file that outlines the PM subscription handler api
+consumes:
+ - "application/json"
+produces:
+ - "application/json"
+
+schemes:
+ - https
+
+# Paths supported by the server application
+paths:
+ /healthcheck:
+ get:
+ operationId: "mod.healthcheck.status"
+ tags:
+ - "HealthCheck"
+ description: >-
+ This is the health check endpoint. If this returns a 200, the server is alive.
+ responses:
+ 200:
+ description: Successful response
+ schema:
+ type: object
+ properties:
+ status:
+ type: string
+ description: Overall health of PMSH
+ enum: [healthy, unhealthy]
+ 503:
+ description: the pmsh service is unavailable
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 4a77543..c8b3bc7 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -18,14 +18,15 @@
import json
import threading
import uuid
+from threading import Timer
import requests
from requests.auth import HTTPBasicAuth
from tenacity import retry, wait_fixed, retry_if_exception_type
import mod.pmsh_logging as logger
-from mod.subscription import Subscription, SubNfState, AdministrativeState
from mod.network_function import NetworkFunction
+from mod.subscription import Subscription, SubNfState, AdministrativeState
class AppConfig:
@@ -36,6 +37,8 @@
self.key_path = kwargs.get('key_path')
self.streams_subscribes = kwargs.get('streams_subscribes')
self.streams_publishes = kwargs.get('streams_publishes')
+ self.operational_policy_name = kwargs.get('operational_policy_name')
+ self.control_loop_name = kwargs.get('control_loop_name')
def get_mr_sub(self, sub_name):
"""
@@ -126,16 +129,17 @@
logger.debug(e)
raise
- def publish_subscription_event_data(self, subscription, xnf_name):
+ def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
"""
Update the Subscription dict with xnf and policy name then publish to DMaaP MR topic.
Args:
subscription: the `Subscription` <Subscription> object.
xnf_name: the xnf to include in the event.
+ app_conf (AppConfig): the application configuration.
"""
try:
- subscription_event = subscription.prepare_subscription_event(xnf_name)
+ subscription_event = subscription.prepare_subscription_event(xnf_name, app_conf)
self.publish_to_topic(subscription_event)
except Exception as e:
logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
@@ -163,6 +167,7 @@
try:
session = requests.Session()
headers = {'accept': 'application/json', 'content-type': 'application/json'}
+ logger.debug(f'Request sent to MR topic: {self.topic_url}')
response = session.get(f'{self.topic_url}/{consumer_group}/{consumer_id}'
f'?timeout={timeout}',
auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
@@ -241,3 +246,13 @@
'failed': Subscription.update_sub_nf_status
}
}
+
+
+class PeriodicTask(Timer):
+ """
+ See :class:`Timer`.
+ """
+
+ def run(self):
+ while not self.finished.wait(self.interval):
+ self.function(*self.args, **self.kwargs)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/subscription.py b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
index 031609a..5449f42 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/subscription.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/subscription.py
@@ -43,25 +43,26 @@
self.administrativeState = kwargs.get('administrativeState')
self.fileBasedGP = kwargs.get('fileBasedGP')
self.fileLocation = kwargs.get('fileLocation')
- self.nfTypeModelInvariantId = kwargs.get('nfTypeModelInvariantId')
self.nfFilter = kwargs.get('nfFilter')
self.measurementGroups = kwargs.get('measurementGroups')
- def prepare_subscription_event(self, xnf_name):
+ def prepare_subscription_event(self, xnf_name, app_conf):
"""Prepare the sub event for publishing
Args:
xnf_name: the AAI xnf name.
+ app_conf (AppConfig): the application configuration.
Returns:
dict: the Subscription event to be published.
"""
clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
- clean_sub.update({'nfName': xnf_name, 'policyName': f'OP-{self.subscriptionName}',
- 'changeType': 'DELETE'
- if self.administrativeState == AdministrativeState.LOCKED.value
- else 'CREATE'})
- return clean_sub
+ sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
+ 'changeType': 'DELETE'
+ if self.administrativeState == AdministrativeState.LOCKED.value
+ else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+ 'subscription': clean_sub}
+ return sub_event
def create(self):
""" Creates a subscription database entry
@@ -143,16 +144,23 @@
db.session.commit()
def delete_subscription(self):
- """ Deletes a subscription from the database """
- SubscriptionModel.query.filter(
- SubscriptionModel.subscription_name == self.subscriptionName). \
- delete(synchronize_session='evaluate')
-
- db.session.commit()
+ """ Deletes a subscription and all its association from the database. A network function
+ that is only associated with the subscription being removed will also be deleted."""
+ subscription = SubscriptionModel.query.filter(
+ SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+ if subscription:
+ for nf_relationship in subscription.nfs:
+ other_nf_relationship = NfSubRelationalModel.query.filter(
+ NfSubRelationalModel.subscription_name != self.subscriptionName,
+ NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
+ if not other_nf_relationship:
+ db.session.delete(nf_relationship.nf)
+ db.session.delete(subscription)
+ db.session.commit()
@retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
retry=retry_if_exception_type(Exception))
- def process_subscription(self, nfs, mr_pub):
+ def process_subscription(self, nfs, mr_pub, app_conf):
action = 'Deactivate'
sub_nf_state = SubNfState.PENDING_DELETE.value
self.update_subscription_status()
@@ -163,7 +171,7 @@
try:
for nf in nfs:
- mr_pub.publish_subscription_event_data(self, nf.nf_name)
+ mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
logger.debug(f'Publishing Event to {action} '
f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
self.add_network_functions_to_subscription(nfs)
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index ab33032..8245466 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -16,18 +16,19 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import sys
-import time
import threading
import mod.aai_client as aai
import mod.pmsh_logging as logger
-from mod import db, create_app
+from mod import db, create_app, launch_api_server
+from mod.aai_event_handler import process_aai_events
from mod.config_handler import ConfigHandler
-from mod.pmsh_utils import AppConfig
+from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.subscription import Subscription, AdministrativeState
-def subscription_processor(config_handler, administrative_state, mr_pub, app):
+def subscription_processor(config_handler, administrative_state, mr_pub, app,
+ mr_aai_event_subscriber):
"""
Checks for changes of administrative state in config and proceeds to process
the Subscription if a change has occurred
@@ -37,10 +38,11 @@
administrative_state (str): The administrative state
mr_pub (_MrPub): MR publisher
app (db): DB application
+ mr_aai_event_subscriber (_MrSub): AAI events MR subscriber
"""
app.app_context().push()
config = config_handler.get_config()
- sub, nfs = aai.get_pmsh_subscription_data(config)
+ app_conf = AppConfig(**config['config'])
new_administrative_state = config['policy']['subscription']['administrativeState']
polling_period = 30.0
@@ -48,17 +50,29 @@
if administrative_state == new_administrative_state:
logger.debug('Administrative State did not change in the Config')
else:
- sub.process_subscription(nfs, mr_pub)
+ logger.debug(f'Administrative State changed from "{administrative_state}" "to '
+ f'"{new_administrative_state}".')
+ sub, nfs = aai.get_pmsh_subscription_data(config)
+ sub.process_subscription(nfs, mr_pub, app_conf)
+ aai_event_thread = PeriodicTask(10, process_aai_events, args=(
+ mr_aai_event_subscriber, sub, mr_pub, app, app_conf))
+
+ if new_administrative_state == AdministrativeState.UNLOCKED.value:
+ logger.debug('Listening to AAI-EVENT topic in MR.')
+ aai_event_thread.start()
+ else:
+ logger.debug('Stopping to listen to AAI-EVENT topic in MR.')
+ aai_event_thread.cancel()
except Exception as err:
logger.debug(f'Error occurred during the activation/deactivation process {err}')
threading.Timer(polling_period, subscription_processor,
- [config_handler, new_administrative_state, mr_pub, app]).start()
+ [config_handler, new_administrative_state, mr_pub, app,
+ mr_aai_event_subscriber]).start()
def main():
-
try:
config_handler = ConfigHandler()
config = config_handler.get_config()
@@ -69,6 +83,7 @@
sub, nfs = aai.get_pmsh_subscription_data(config)
mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
+ mr_aai_event_subscriber = app_conf.get_mr_sub('aai_subscriber')
initial_start_delay = 5.0
administrative_state = AdministrativeState.LOCKED.value
@@ -77,18 +92,17 @@
administrative_state = subscription_in_db.status
threading.Timer(initial_start_delay, subscription_processor,
- [config_handler, administrative_state, mr_pub, app]).start()
+ [config_handler, administrative_state, mr_pub,
+ app, mr_aai_event_subscriber]).start()
threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start()
+ launch_api_server(app_conf)
+
except Exception as e:
logger.debug(f'Failed to Init PMSH: {e}')
sys.exit(e)
- while True:
- logger.debug(Subscription.get_all_nfs_subscription_relations())
- time.sleep(5)
-
if __name__ == '__main__':
main()
diff --git a/components/pm-subscription-handler/pom.xml b/components/pm-subscription-handler/pom.xml
index a425806..74ad400 100644
--- a/components/pm-subscription-handler/pom.xml
+++ b/components/pm-subscription-handler/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
- <version>2.1.0</version>
+ <version>3.0.0</version>
</parent>
<!--- CHANGE THE FOLLOWING 3 OBJECTS for your own repo -->
<groupId>org.onap.dcaegen2.services</groupId>
@@ -36,11 +36,11 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sonar.sources>.</sonar.sources>
<sonar.junit.reportsPath>xunit-results.xml</sonar.junit.reportsPath>
- <sonar.python.coverage.reportPath>coverage.xml</sonar.python.coverage.reportPath>
+ <sonar.python.coverage.reportPaths>coverage.xml</sonar.python.coverage.reportPaths>
<sonar.python.xunit.reportPath>xunit-results.xml</sonar.python.xunit.reportPath>
<sonar.language>py</sonar.language>
<sonar.pluginname>python</sonar.pluginname>
- <sonar.inclusions>**/**.py</sonar.inclusions>
+ <sonar.inclusions>**/*.py</sonar.inclusions>
<sonar.exclusions>target/**,tests/**,setup.py,**/__init__.py</sonar.exclusions>
<exec-mvn-plugin-version>1.2.1</exec-mvn-plugin-version>
</properties>
diff --git a/components/pm-subscription-handler/tests/data/cbs_data_1.json b/components/pm-subscription-handler/tests/data/cbs_data_1.json
index ccc0626..8dc225d 100644
--- a/components/pm-subscription-handler/tests/data/cbs_data_1.json
+++ b/components/pm-subscription-handler/tests/data/cbs_data_1.json
@@ -5,7 +5,6 @@
"administrativeState":"UNLOCKED",
"fileBasedGP":15,
"fileLocation":"\/pm\/pm.xml",
- "nfTypeModelInvariantId":"2829292",
"nfFilter":{
"swVersions":[
"1.0.0",
@@ -61,6 +60,8 @@
}
},
"config":{
+ "control_loop_name": "pmsh-control-loop",
+ "operational_policy_name": "pmsh-operational-policy",
"aaf_password":"demo123456!",
"aaf_identity":"dcae@dcae.onap.org",
"cert_path":"/opt/app/pm-mapper/etc/certs/cert.pem",
diff --git a/components/pm-subscription-handler/tests/data/cbs_data_2.json b/components/pm-subscription-handler/tests/data/cbs_data_2.json
index 43f67e8..c223dde 100755
--- a/components/pm-subscription-handler/tests/data/cbs_data_2.json
+++ b/components/pm-subscription-handler/tests/data/cbs_data_2.json
@@ -6,7 +6,6 @@
"administrativeState": "UNLOCKED",
"fileBasedGP": 15,
"fileLocation": "c:\/\/PM",
- "nfTypeModelInvariantId": "2829292",
"nfFilter": {
"swVersions": [
"A21",
diff --git a/components/pm-subscription-handler/tests/data/mr_aai_events.json b/components/pm-subscription-handler/tests/data/mr_aai_events.json
new file mode 100755
index 0000000..202d133
--- /dev/null
+++ b/components/pm-subscription-handler/tests/data/mr_aai_events.json
@@ -0,0 +1,6 @@
+{
+ "mr_response": [
+ "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"pnf\",\"top-entity-type\":\"pnf\",\"entity-link\":\"/aai/v16/network/pnfs/pnf/pnf_newly_discovered\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"UPDATE\",\"sequence-number\":\"0\",\"id\":\"db09e090-196e-4f84-9645-e449b1cd3640\",\"source-name\":\"dcae-curl\",\"version\":\"v16\",\"timestamp\":\"20200203-15:14:08:807\"},\"entity\":{\"ipaddress-v4-oam\":\"10.10.10.37\",\"nf-role\":\"gNB\",\"equip-type\":\"val8\",\"relationship-list\":{\"relationship\":[{\"related-to\":\"service-instance\",\"relationship-data\":[{\"relationship-value\":\"Demonstration\",\"relationship-key\":\"customer.global-customer-id\"},{\"relationship-value\":\"vCPE\",\"relationship-key\":\"service-subscription.service-type\"},{\"relationship-value\":\"2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-key\":\"service-instance.service-instance-id\"}],\"related-link\":\"/aai/v16/business/customers/customer/Demonstration/service-subscriptions/service-subscription/vCPE/service-instances/service-instance/2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-label\":\"org.onap.relationships.inventory.ComposedOf\",\"related-to-property\":[{\"property-key\":\"service-instance.service-instance-name\",\"property-value\":\"Svc6_1\"}]}]},\"equip-vendor\":\"Ericsson\",\"serial-number\":\"6061ZW3\",\"ipaddress-v6-oam\":\"2001:0db8:0:0:0:0:1428:57ab\",\"equip-model\":\"val6\",\"in-maint\":false,\"resource-version\":\"1578668956804\",\"sw-version\":\"val7\",\"pnf-id\":\"eabcfaf7-b7f3-45fb-94e7-e6112fb3e8b8\",\"pnf-name\":\"pnf_newly_discovered\",\"orchestration-status\":\"Active\"}}",
+ "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"pnf\",\"top-entity-type\":\"pnf\",\"entity-link\":\"/aai/v16/network/pnfs/pnf/pnf_newly_discovered\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"UPDATE\",\"sequence-number\":\"0\",\"id\":\"db09e090-196e-4f84-9645-e449b1cd3640\",\"source-name\":\"dcae-curl\",\"version\":\"v16\",\"timestamp\":\"20200203-15:14:08:807\"},\"entity\":{\"orchestration-status\":\"Active\",\"ipaddress-v4-oam\":\"10.10.10.37\",\"nf-role\":\"gNB\",\"equip-type\":\"val8\",\"relationship-list\":{\"relationship\":[{\"related-to\":\"service-instance\",\"relationship-data\":[{\"relationship-value\":\"Demonstration\",\"relationship-key\":\"customer.global-customer-id\"},{\"relationship-value\":\"vCPE\",\"relationship-key\":\"service-subscription.service-type\"},{\"relationship-value\":\"2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-key\":\"service-instance.service-instance-id\"}],\"related-link\":\"/aai/v16/business/customers/customer/Demonstration/service-subscriptions/service-subscription/vCPE/service-instances/service-instance/2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-label\":\"org.onap.relationships.inventory.ComposedOf\",\"related-to-property\":[{\"property-key\":\"service-instance.service-instance-name\",\"property-value\":\"Svc6_1\"}]}]},\"equip-vendor\":\"Ericsson\",\"serial-number\":\"6061ZW3\",\"ipaddress-v6-oam\":\"2001:0db8:0:0:0:0:1428:57ab\",\"equip-model\":\"val6\",\"in-maint\":false,\"resource-version\":\"1578668956804\",\"sw-version\":\"val7\",\"pnf-id\":\"eabcfaf7-b7f3-45fb-94e7-e6112fb3e8b8\",\"pnf-name\":\"pnf_already_active\",\"orchestration-status\":\"Active\"}}",
+ "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"pnf\",\"top-entity-type\":\"pnf\",\"entity-link\":\"/aai/v16/network/pnfs/pnf/pnf_newly_discovered\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"DELETE\",\"sequence-number\":\"0\",\"id\":\"db09e090-196e-4f84-9645-e449b1cd3640\",\"source-name\":\"dcae-curl\",\"version\":\"v16\",\"timestamp\":\"20200203-15:14:08:807\"},\"entity\":{\"ipaddress-v4-oam\":\"10.10.10.37\",\"nf-role\":\"gNB\",\"equip-type\":\"val8\",\"relationship-list\":{\"relationship\":[{\"related-to\":\"service-instance\",\"relationship-data\":[{\"relationship-value\":\"Demonstration\",\"relationship-key\":\"customer.global-customer-id\"},{\"relationship-value\":\"vCPE\",\"relationship-key\":\"service-subscription.service-type\"},{\"relationship-value\":\"2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-key\":\"service-instance.service-instance-id\"}],\"related-link\":\"/aai/v16/business/customers/customer/Demonstration/service-subscriptions/service-subscription/vCPE/service-instances/service-instance/2c03b2a8-e31a-4749-9e99-3089ab441400\",\"relationship-label\":\"org.onap.relationships.inventory.ComposedOf\",\"related-to-property\":[{\"property-key\":\"service-instance.service-instance-name\",\"property-value\":\"Svc6_1\"}]}]},\"equip-vendor\":\"Ericsson\",\"serial-number\":\"6061ZW3\",\"ipaddress-v6-oam\":\"2001:0db8:0:0:0:0:1428:57ab\",\"equip-model\":\"val6\",\"in-maint\":false,\"resource-version\":\"1578668956804\",\"sw-version\":\"val7\",\"pnf-id\":\"eabcfaf7-b7f3-45fb-94e7-e6112fb3e8b8\",\"pnf-name\":\"pnf_to_be_deleted\",\"orchestration-status\":\"Active\"}}"]
+}
diff --git a/components/pm-subscription-handler/tests/data/pm_subscription_event.json b/components/pm-subscription-handler/tests/data/pm_subscription_event.json
new file mode 100755
index 0000000..e190aa2
--- /dev/null
+++ b/components/pm-subscription-handler/tests/data/pm_subscription_event.json
@@ -0,0 +1,54 @@
+{
+ "nfName":"pnf_1",
+ "policyName":"pmsh-operational-policy",
+ "changeType":"CREATE",
+ "closedLoopControlName":"pmsh-control-loop",
+ "subscription":{
+ "subscriptionName":"ExtraPM-All-gNB-R2B",
+ "administrativeState":"UNLOCKED",
+ "fileBasedGP":15,
+ "fileLocation":"/pm/pm.xml",
+ "measurementGroups":[
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"countera"
+ },
+ {
+ "measurementType":"counterb"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dna"
+ },
+ {
+ "DN":"dnb"
+ }
+ ]
+ }
+ },
+ {
+ "measurementGroup":{
+ "measurementTypes":[
+ {
+ "measurementType":"counterc"
+ },
+ {
+ "measurementType":"counterd"
+ }
+ ],
+ "managedObjectDNsBasic":[
+ {
+ "DN":"dnc"
+ },
+ {
+ "DN":"dnd"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py
new file mode 100755
index 0000000..0fd9e77
--- /dev/null
+++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py
@@ -0,0 +1,55 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+import json
+from os import path
+from unittest import TestCase
+from unittest.mock import patch, Mock
+
+from mod.aai_event_handler import OrchestrationStatus, process_aai_events
+from mod.network_function import NetworkFunction
+
+
+class AAIEventHandlerTest(TestCase):
+
+ def setUp(self):
+ with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data_1 = json.load(data)
+ with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
+ self.mr_aai_events = json.load(data)["mr_response"]
+ self.mock_sub = Mock(nfFilter={'swVersions': ['1.0.0', '1.0.1'],
+ 'nfNames': ['^pnf.*', '^vnf.*']})
+ self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
+ self.mock_mr_pub = Mock()
+ self.mock_app = Mock()
+
+ @patch('mod.aai_event_handler.NetworkFunction.delete')
+ @patch('mod.aai_event_handler.NetworkFunction.get')
+ @patch('pmsh_service_main.AppConfig')
+ def test_process_aai_update_and_delete_events(self, mock_app_conf, mock_nf_get, mock_nf_delete):
+ pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
+ orchestration_status=OrchestrationStatus.ACTIVE.value)
+ mock_nf_get.side_effect = [None, pnf_already_active]
+ expected_nf_for_processing = NetworkFunction(
+ nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
+
+ process_aai_events(self.mock_mr_sub, self.mock_sub,
+ self.mock_mr_pub, self.mock_app, mock_app_conf)
+
+ self.mock_sub.process_subscription.assert_called_once_with([expected_nf_for_processing],
+ self.mock_mr_pub, mock_app_conf)
+ mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
diff --git a/components/pm-subscription-handler/tests/test_healthcheck.py b/components/pm-subscription-handler/tests/test_healthcheck.py
new file mode 100755
index 0000000..6e960d0
--- /dev/null
+++ b/components/pm-subscription-handler/tests/test_healthcheck.py
@@ -0,0 +1,27 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2019-2020 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+
+import unittest
+
+from pmsh_service.mod.healthcheck import status
+
+
+class HealthcheckTestCase(unittest.TestCase):
+
+ def test_status_response_healthy(self):
+ self.assertEqual(status()['status'], 'healthy')
diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py
index e9394b4..4fca077 100755
--- a/components/pm-subscription-handler/tests/test_network_function.py
+++ b/components/pm-subscription-handler/tests/test_network_function.py
@@ -21,6 +21,7 @@
from mod import db, create_app
from mod.network_function import NetworkFunction
+from mod.subscription import Subscription
class NetworkFunctionTests(TestCase):
@@ -69,7 +70,13 @@
def test_delete_network_function(self):
self.nf_1.create()
self.nf_2.create()
- self.nf_1.delete(nf_name='pnf_1')
- nfs = NetworkFunction.get_all()
+ sub = Subscription(**{"subscriptionName": "sub"})
+ sub.add_network_functions_to_subscription([self.nf_1, self.nf_2])
+ NetworkFunction.delete(nf_name=self.nf_1.nf_name)
+
+ nfs = NetworkFunction.get_all()
self.assertEqual(1, len(nfs))
+ self.assertEqual(1, len(Subscription.get_all_nfs_subscription_relations()))
+ pnf_1_deleted = [nf for nf in nfs if nf.nf_name != self.nf_1.nf_name]
+ self.assertTrue(pnf_1_deleted)
diff --git a/components/pm-subscription-handler/tests/test_pmsh_service.py b/components/pm-subscription-handler/tests/test_pmsh_service.py
index 4a6032b..cd28a5d 100644
--- a/components/pm-subscription-handler/tests/test_pmsh_service.py
+++ b/components/pm-subscription-handler/tests/test_pmsh_service.py
@@ -38,21 +38,27 @@
self.mock_sub = mock_sub
self.mock_mr_pub = mock_mr_pub
self.mock_config_handler = mock_config_handler
+ self.mock_aai_sub = mock_sub
self.nf_1 = NetworkFunction(nf_name='pnf_1')
self.nf_2 = NetworkFunction(nf_name='pnf_2')
self.nfs = [self.nf_1, self.nf_2]
@patch('threading.Timer')
@patch('mod.aai_client.get_pmsh_subscription_data')
- def test_subscription_processor_changed_state(self, mock_get_aai, mock_thread):
+ @patch('pmsh_service_main.PeriodicTask')
+ @patch('pmsh_service_main.AppConfig')
+ def test_subscription_processor_changed_state(self, mock_app_conf, periodic_task, mock_get_aai,
+ mock_thread):
self.mock_config_handler.get_config.return_value = self.cbs_data_1
mock_get_aai.return_value = self.mock_sub, self.nfs
mock_thread.start.return_value = 1
+ periodic_task.start.return_value = 1
pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED',
- self.mock_mr_pub, self.mock_app)
+ self.mock_mr_pub, self.mock_app, self.mock_aai_sub)
- self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub)
+ self.mock_sub.process_subscription.assert_called_with(self.nfs, self.mock_mr_pub,
+ mock_app_conf.return_value)
@patch('threading.Timer')
@patch('mod.pmsh_logging.debug')
@@ -63,7 +69,7 @@
mock_thread.start.return_value = 1
pmsh_service.subscription_processor(self.mock_config_handler, 'UNLOCKED', self.mock_mr_pub,
- self.mock_app)
+ self.mock_app, self.mock_aai_sub)
mock_logger.assert_called_with('Administrative State did not change in the Config')
@@ -77,6 +83,6 @@
self.mock_sub.process_subscription.side_effect = Exception
pmsh_service.subscription_processor(self.mock_config_handler, 'LOCKED', self.mock_mr_pub,
- self.mock_app)
+ self.mock_app, self.mock_aai_sub)
mock_logger.assert_called_with(f'Error occurred during the '
f'activation/deactivation process ')
diff --git a/components/pm-subscription-handler/tests/test_pmsh_utils.py b/components/pm-subscription-handler/tests/test_pmsh_utils.py
index 03e8c69..ea657f4 100644
--- a/components/pm-subscription-handler/tests/test_pmsh_utils.py
+++ b/components/pm-subscription-handler/tests/test_pmsh_utils.py
@@ -91,7 +91,7 @@
def test_mr_pub_publish_sub_event_data_success(self):
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
- mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
+ mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201', self.app_conf)
pub_to_topic_call.assert_called_once()
@responses.activate
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index 8fe233e..bd39f28 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -27,6 +27,7 @@
import mod.aai_client as aai_client
from mod import db, create_app
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription, NetworkFunctionFilter
@@ -35,7 +36,8 @@
@patch('mod.pmsh_utils._MrSub')
@patch('mod.get_db_connection_url')
@patch.object(Session, 'put')
- def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
+ @patch('pmsh_service_main.AppConfig')
+ def setUp(self, mock_app_config, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
@@ -61,6 +63,7 @@
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
+ self.mock_app_config = mock_app_config
db.create_all()
def tearDown(self):
@@ -134,13 +137,16 @@
self.assertEqual('new_status', sub.status)
def test_delete_subscription(self):
- self.sub_1.create()
- subs = self.sub_1.get_all()
- self.assertEqual(1, len(subs))
+ self.sub_1.add_network_functions_to_subscription([self.nf_1, self.nf_2])
+ self.sub_2.add_network_functions_to_subscription([self.nf_2])
self.sub_1.delete_subscription()
- new_subs = self.sub_1.get_all()
- self.assertEqual(0, len(new_subs))
+
+ self.assertEqual(1, len(Subscription.get_all()))
+ self.assertEqual(None, Subscription.get(self.sub_1.subscriptionName))
+ self.assertEqual(1, len(Subscription.get_all_nfs_subscription_relations()))
+ self.assertEqual(1, len(NetworkFunction.get_all()))
+ self.assertEqual(None, NetworkFunction.get(nf_name=self.nf_1.nf_name))
def test_update_sub_nf_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
@@ -160,7 +166,7 @@
def test_process_activate_subscription(self, mock_update_sub_status,
mock_update_sub_nf, mock_add_nfs):
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
@@ -174,7 +180,7 @@
mock_update_sub_nf):
self.sub_1.administrativeState = 'LOCKED'
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
- self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
@@ -184,4 +190,13 @@
def test_process_subscription_exception(self):
self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
self.assertRaises(Exception, self.sub_1.process_subscription,
- [self.nf_1], 'not_mr_pub')
+ [self.nf_1], 'not_mr_pub', 'app_config')
+
+ def test_prepare_subscription_event(self):
+ with open(os.path.join(os.path.dirname(__file__),
+ 'data/pm_subscription_event.json'), 'r') as data:
+ expected_sub_event = json.load(data)
+ app_conf = AppConfig(**self.cbs_data_1['config'])
+ actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf)
+ print(actual_sub_event)
+ self.assertEqual(expected_sub_event, actual_sub_event)
diff --git a/pom.xml b/pom.xml
index 9165c30..a5465be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
<parent>
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
- <version>2.0.0</version>
+ <version>3.0.0</version>
</parent>
<groupId>org.onap.dcaegen2</groupId>
<artifactId>services</artifactId>
@@ -34,9 +34,6 @@
<version>1.0.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<packaging>pom</packaging>
- <modules>
- <module>components</module>
- </modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
diff --git a/releases/pmsh-1.0.0-container.yaml b/releases/pmsh-1.0.0-container.yaml
new file mode 100755
index 0000000..4d6a974
--- /dev/null
+++ b/releases/pmsh-1.0.0-container.yaml
@@ -0,0 +1,9 @@
+distribution_type: 'container'
+version: '1.0.0'
+project: 'dcaegen2-services-pmsh'
+log_dir: 'dcaegen2-services-pmsh-docker-merge-master/13/'
+containers:
+ - name: 'org.onap.dcaegen2.services.pmsh'
+ version: '1.0.0-SNAPSHOT-20200302T125753Z'
+container_release_tag: '1.0.0'
+ref: 'c19a0a85bbbc8dcf0633a32d26f4128f6c8c4544'